Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add propagator in MessageApi when subscribed to a pubsub topic #354

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions libp2p/src/main/kotlin/io/libp2p/core/pubsub/PubsubApi.kt
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ interface PubsubApi : PubsubSubscriberApi {
*/
interface MessageApi {

/**
* The identity of the peer who has propagated this message
*/
val propagator: PeerId

val originalMessage: PubsubMessage

/**
Expand Down
9 changes: 5 additions & 4 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.function.Consumer
// 1 MB default max message size
const val DEFAULT_MAX_PUBSUB_MESSAGE_SIZE = 1 shl 20

typealias PubsubMessageHandler = (PubsubMessage) -> CompletableFuture<ValidationResult>
typealias PubsubMessageHandler = (PeerId, PubsubMessage) -> CompletableFuture<ValidationResult>

open class DefaultPubsubMessage(override val protobufMessage: Rpc.Message) : AbstractPubsubMessage() {
override val messageId: MessageId = protobufMessage.from.toWBytes() + protobufMessage.seqno.toWBytes()
Expand All @@ -44,7 +44,8 @@ abstract class AbstractRouter(
protected val messageValidator: PubsubRouterMessageValidator
) : P2PServiceSemiDuplex(executor), PubsubRouter, PubsubRouterDebug {

protected var msgHandler: PubsubMessageHandler = { throw IllegalStateException("Message handler is not initialized for PubsubRouter") }
protected var msgHandler: PubsubMessageHandler =
{ _, _ -> throw IllegalStateException("Message handler is not initialized for PubsubRouter") }

protected open val peersTopics = mutableMultiBiMap<PeerHandler, Topic>()
protected open val subscribedTopics = linkedSetOf<Topic>()
Expand Down Expand Up @@ -216,7 +217,7 @@ abstract class AbstractRouter(
}
}

val validFuts = msgValid.map { it to msgHandler(it) }
val validFuts = msgValid.map { it to msgHandler(peer.peerId, it) }
val doneUndone = validFuts.groupBy { it.second.isDone }
val done = doneUndone.getOrDefault(true, emptyList())
val undone = doneUndone.getOrDefault(false, emptyList())
Expand Down Expand Up @@ -331,7 +332,7 @@ abstract class AbstractRouter(
return peer.writeAndFlush(msg)
}

override fun initHandler(handler: (PubsubMessage) -> CompletableFuture<ValidationResult>) {
override fun initHandler(handler: (PeerId, PubsubMessage) -> CompletableFuture<ValidationResult>) {
msgHandler = handler
}
}
10 changes: 5 additions & 5 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubApiImpl.kt
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ open class PubsubApiImpl(val router: PubsubRouter) : PubsubApi {
}

init {
router.initHandler { onNewMessage(it) }
router.initHandler { peerId, msg -> onNewMessage(peerId, msg) }
}

val subscriptions: MutableMap<Topic, MutableList<SubscriptionImpl>> = mutableMapOf()
Expand All @@ -74,11 +74,11 @@ open class PubsubApiImpl(val router: PubsubRouter) : PubsubApi {
}
}

private fun onNewMessage(msg: PubsubMessage): CompletableFuture<ValidationResult> {
private fun onNewMessage(peerId: PeerId, msg: PubsubMessage): CompletableFuture<ValidationResult> {
val validationFuts = synchronized(this) {
msg.topics.mapNotNull { subscriptions[Topic(it)] }.flatten().distinct()
}.map {
it.receiver.apply(rpc2Msg(msg))
it.receiver.apply(rpc2Msg(peerId, msg))
}
return validationFuts.thenApplyAll {
if (it.isEmpty()) {
Expand All @@ -89,7 +89,7 @@ open class PubsubApiImpl(val router: PubsubRouter) : PubsubApi {
}
}

private fun rpc2Msg(msg: PubsubMessage) = MessageImpl(msg)
private fun rpc2Msg(peerId: PeerId, msg: PubsubMessage) = MessageImpl(peerId, msg)

override fun subscribe(receiver: Validator, vararg topics: Topic): PubsubSubscription {
val subscription = SubscriptionImpl(topics, receiver)
Expand Down Expand Up @@ -138,7 +138,7 @@ open class PubsubApiImpl(val router: PubsubRouter) : PubsubApi {
PublisherImpl(privKey, seqIdGenerator)
}

class MessageImpl(override val originalMessage: PubsubMessage) : MessageApi {
class MessageImpl(override val propagator: PeerId, override val originalMessage: PubsubMessage) : MessageApi {
private val msg = originalMessage.protobufMessage
override val data = msg.data.toByteArray().toByteBuf()
override val from = if (msg.hasFrom()) msg.from.toByteArray() else null
Expand Down
2 changes: 1 addition & 1 deletion libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ interface PubsubMessageRouter {
* All the messages received by the router are forwarded to the [handler] independently
* of any client subscriptions. Is it up to the client API to sort out subscriptions
*/
fun initHandler(handler: (PubsubMessage) -> CompletableFuture<ValidationResult>)
fun initHandler(handler: (PeerId, PubsubMessage) -> CompletableFuture<ValidationResult>)

/**
* Notifies the router that a client wants to receive messages on the following topics
Expand Down
4 changes: 2 additions & 2 deletions libp2p/src/test/java/io/libp2p/pubsub/GossipApiTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ public void testFastMessageId() throws Exception {

BlockingQueue<PubsubMessage> messages = new LinkedBlockingQueue<>();
router.initHandler(
m -> {
messages.add(m);
(__, msg) -> {
messages.add(msg);
return CompletableFuture.completedFuture(ValidationResult.Valid);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ class GossipV1_1Tests {
fun testAppValidatorScore() {
val test = TwoRoutersTest()
val validator = AtomicReference<CompletableFuture<ValidationResult>>(RESULT_VALID)
test.gossipRouter.initHandler { validator.get() }
test.gossipRouter.initHandler { _, _ -> validator.get() }

test.mockRouter.subscribe("topic1")
test.gossipRouter.subscribe("topic1")
Expand Down Expand Up @@ -860,7 +860,7 @@ class GossipV1_1Tests {
fun testValidatorIgnoreResult() {
val test = ManyRoutersTest(mockRouterCount = 2)
val validator = AtomicReference<CompletableFuture<ValidationResult>>(RESULT_VALID)
test.gossipRouter.initHandler { validator.get() }
test.gossipRouter.initHandler { _, _ -> validator.get() }
test.connectAll()
test.gossipRouter.subscribe("topic1")
test.routers.forEach { it.router.subscribe("topic1") }
Expand Down
4 changes: 2 additions & 2 deletions libp2p/src/testFixtures/kotlin/io/libp2p/pubsub/TestRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ class TestRouter(

val inboundMessages = LinkedBlockingQueue<PubsubMessage>()
var handlerValidationResult = RESULT_VALID
val routerHandler: (PubsubMessage) -> CompletableFuture<ValidationResult> = {
inboundMessages += it
val routerHandler: (PeerId, PubsubMessage) -> CompletableFuture<ValidationResult> = { _, msg ->
inboundMessages += msg
handlerValidationResult
}

Expand Down
Loading