Skip to content

Commit

Permalink
Merge pull request #394 from libp2p/1.2.1
Browse files Browse the repository at this point in the history
1.2.1 release
  • Loading branch information
tbenr authored Oct 24, 2024
2 parents fe666bf + 1cde874 commit c825581
Show file tree
Hide file tree
Showing 12 changed files with 321 additions and 74 deletions.
27 changes: 17 additions & 10 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ abstract class AbstractRouter(

/**
* Flushes all pending message parts for all peers
* @see addPendingRpcPart
*/
protected fun flushAllPending() {
pendingRpcParts.pendingPeers.forEach(::flushPending)
Expand Down Expand Up @@ -163,7 +162,7 @@ abstract class AbstractRouter(

// Validate message
if (!validateMessageListLimits(msg)) {
logger.debug("Dropping msg with lists exceeding limits from peer $peer")
logger.debug("Dropping msg with lists exceeding limits from peer {}", peer)
return
}

Expand All @@ -173,7 +172,7 @@ abstract class AbstractRouter(
.filterIncomingSubscriptions(subscriptions, peersTopics.getByFirst(peer))
.forEach { handleMessageSubscriptions(peer, it) }
} catch (e: Exception) {
logger.debug("Subscription filter error, ignoring message from peer $peer", e)
logger.debug("Subscription filter error, ignoring message from peer {}", peer, e)
return
}

Expand All @@ -182,7 +181,7 @@ abstract class AbstractRouter(
}

val (msgSubscribed, nonSubscribed) = msg.publishList
.partition { it.topicIDsList.any { it in subscribedTopics } }
.partition { rpcMsg -> rpcMsg.topicIDsList.any { it in subscribedTopics } }

nonSubscribed.forEach { notifyNonSubscribedMessage(peer, it) }

Expand All @@ -207,7 +206,7 @@ abstract class AbstractRouter(
messageValidator.validate(it)
true
} catch (e: Exception) {
logger.debug("Invalid pubsub message from peer $peer: $it", e)
logger.debug("Invalid pubsub message from peer {}: {}", peer, it, e)
seenMessages[it] = Optional.of(ValidationResult.Invalid)
notifyUnseenInvalidMessage(peer, it)
false
Expand Down Expand Up @@ -248,8 +247,16 @@ abstract class AbstractRouter(
{ res, err ->
when {
err != null -> logger.warn("Exception while handling message from peer $peer: ${it.first}", err)
res == ValidationResult.Invalid -> logger.debug("Invalid pubsub message from peer $peer: ${it.first}")
res == ValidationResult.Ignore -> logger.trace("Ignoring pubsub message from peer $peer: ${it.first}")
res == ValidationResult.Invalid -> logger.debug(
"Invalid pubsub message from peer {}: {}",
peer,
it.first
)
res == ValidationResult.Ignore -> logger.trace(
"Ignoring pubsub message from peer {}: {}",
peer,
it.first
)
else -> {
newValidatedMessages(singletonList(it.first), peer)
flushAllPending()
Expand All @@ -273,15 +280,15 @@ abstract class AbstractRouter(

override fun onPeerWireException(peer: PeerHandler?, cause: Throwable) {
// exception occurred in protobuf decoders
logger.debug("Malformed message from $peer : $cause")
logger.debug("Malformed message from {} : {}", peer, cause)
peer?.also { notifyMalformedMessage(it) }
}

override fun onServiceException(peer: PeerHandler?, msg: Any?, cause: Throwable) {
if (cause is BadPeerException) {
logger.debug("Remote peer ($peer) misbehaviour on message $msg: $cause")
logger.debug("Remote peer ({}) misbehaviour on message {} : {}", peer, msg, cause)
} else {
logger.warn("AbstractRouter internal error on message $msg from peer $peer", cause)
logger.warn("AbstractRouter internal error on message {} from peer {}", msg, peer, cause)
}
}

Expand Down
3 changes: 3 additions & 0 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ interface PubsubMessage {
val topics: List<Topic>
get() = protobufMessage.topicIDsList

val size: Int
get() = protobufMessage.data.size()

fun messageSha256() = sha256(protobufMessage.toByteArray())

override fun equals(other: Any?): Boolean
Expand Down
23 changes: 17 additions & 6 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ fun defaultDLazy(D: Int) = D
fun defaultDScore(D: Int) = D * 2 / 3
fun defaultDOut(D: Int, DLow: Int) = min(D / 2, max(DLow - 1, 0))

// floodPublishMaxMessageSizeThreshold shortcuts
const val NEVER_FLOOD_PUBLISH = 0
const val ALWAYS_FLOOD_PUBLISH = Int.MAX_VALUE

/**
* Parameters of Gossip 1.1 router
*/
Expand Down Expand Up @@ -112,11 +116,16 @@ data class GossipParams(
val seenTTL: Duration = 2.minutes,

/**
* [floodPublish] is a gossipsub router option that enables flood publishing.
* When this is enabled, published messages are forwarded to all peers with score >=
* to publishThreshold
* [floodPublishMaxMessageSizeThreshold] controls the maximum size (in bytes) a message will be
* published using flood publishing mode.
* When a message size is <= [floodPublishMaxMessageSizeThreshold], published messages are forwarded
* to all peers with score >= to [GossipScoreParams.publishThreshold]
*
* [NEVER_FLOOD_PUBLISH] and [ALWAYS_FLOOD_PUBLISH] can be used as shortcuts.
*
* The default is [NEVER_FLOOD_PUBLISH] (0 KiB).
*/
val floodPublish: Boolean = false,
val floodPublishMaxMessageSizeThreshold: Int = NEVER_FLOOD_PUBLISH,

/**
* [gossipFactor] affects how many peers we will emit gossip to at each heartbeat.
Expand Down Expand Up @@ -240,9 +249,9 @@ data class GossipParams(

/**
* [iDontWantMinMessageSizeThreshold] controls the minimum size (in bytes) that an incoming message needs to be so that an IDONTWANT message is sent to mesh peers.
* The default is 16 KB.
* The default is 16 KiB.
*/
val iDontWantMinMessageSizeThreshold: Int = 16000,
val iDontWantMinMessageSizeThreshold: Int = 16384,

/**
* [iDontWantTTL] Expiry time for cache of received IDONTWANT messages for peers
Expand All @@ -260,6 +269,8 @@ data class GossipParams(
check(DLow <= D, "DLow should be <= D")
check(DHigh >= D, "DHigh should be >= D")
check(gossipFactor in 0.0..1.0, "gossipFactor should be in range [0.0, 1.0]")
check(floodPublishMaxMessageSizeThreshold >= 0, "floodPublishMaxMessageSizeThreshold should be >= 0")
check(iDontWantMinMessageSizeThreshold >= 0, "iDontWantMinMessageSizeThreshold should be >= 0")
}

companion object {
Expand Down
66 changes: 51 additions & 15 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -281,17 +281,21 @@ open class GossipRouter(
when {
isDirect(peer) ->
prune(peer, topic)

isBackOff(peer, topic) -> {
notifyRouterMisbehavior(peer, 1)
if (isBackOffFlood(peer, topic)) {
notifyRouterMisbehavior(peer, 1)
}
prune(peer, topic)
}

score.score(peer.peerId) < 0 ->
prune(peer, topic)

meshPeers.size >= params.DHigh && !peer.isOutbound() ->
prune(peer, topic)

peer !in meshPeers ->
graft(peer, topic)
}
Expand Down Expand Up @@ -400,31 +404,63 @@ open class GossipRouter(
override fun broadcastOutbound(msg: PubsubMessage): CompletableFuture<Unit> {
msg.topics.forEach { lastPublished[it] = currentTimeSupplier() }

val floodPublish = msg.size <= params.floodPublishMaxMessageSizeThreshold

val peers =
if (params.floodPublish) {
if (floodPublish) {
msg.topics
.flatMap { getTopicPeers(it) }
.filter { score.score(it.peerId) >= scoreParams.publishThreshold }
.plus(getDirectPeers())
} else {
msg.topics
.mapNotNull { topic ->
mesh[topic] ?: fanout[topic] ?: getTopicPeers(topic).shuffled(random).take(params.D)
.also {
if (it.isNotEmpty()) fanout[topic] = it.toMutableSet()
.map { topic ->
val topicMeshPeers = mesh[topic]
if (topicMeshPeers != null) {
// we are subscribed to the topic
if (topicMeshPeers.size < params.D) {
// we need extra non-mesh peers for more reliable publishing
val nonMeshTopicPeers = getTopicPeers(topic) - topicMeshPeers
val (nonMeshTopicPeersAbovePublishThreshold, nonMeshTopicPeersBelowPublishThreshold) =
nonMeshTopicPeers.partition { score.score(it.peerId) >= scoreParams.publishThreshold }
// this deviates from the original spec but we want at least D peers for publishing
// prioritizing mesh peers, then non-mesh peers with acceptable score,
// and then underscored non-mesh peers as a last resort
listOf(
topicMeshPeers,
nonMeshTopicPeersAbovePublishThreshold.shuffled(random),
nonMeshTopicPeersBelowPublishThreshold.shuffled(random)
)
.flatten()
.take(params.D)
} else {
topicMeshPeers
}
} else {
// we are not subscribed to the topic
fanout[topic] ?: getTopicPeers(topic).shuffled(random).take(params.D)
.also {
if (it.isNotEmpty()) fanout[topic] = it.toMutableSet()
}
}
}
.flatten()
}
val list = peers
.filterNot { peerDoesNotWantMessage(it, msg.messageId) }
.map { submitPublishMessage(it, msg) }

mCache += msg
flushAllPending()

return if (list.isNotEmpty()) {
anyComplete(list)
return if (peers.isNotEmpty()) {
iDontWant(msg)
val publishedMessages = peers
.filterNot { peerDoesNotWantMessage(it, msg.messageId) }
.map { submitPublishMessage(it, msg) }
if (publishedMessages.isEmpty()) {
// all peers have sent IDONTWANT for this message id
CompletableFuture.completedFuture(Unit)
} else {
flushAllPending()
anyComplete(publishedMessages)
}
} else {
completedExceptionally(
NoPeersForOutboundMessageException("No peers for message topics ${msg.topics} found")
Expand Down Expand Up @@ -605,16 +641,16 @@ open class GossipRouter(
enqueueIwant(peer, messageIds)
}

private fun iDontWant(msg: PubsubMessage, receivedFrom: PeerHandler) {
private fun iDontWant(msg: PubsubMessage, receivedFrom: PeerHandler? = null) {
if (!this.protocol.supportsIDontWant()) return
if (msg.protobufMessage.data.size() < params.iDontWantMinMessageSizeThreshold) return
if (msg.size < params.iDontWantMinMessageSizeThreshold) return
// we need to send IDONTWANT messages to mesh peers immediately in order for them to have an effect
msg.topics
.mapNotNull { mesh[it] }
.flatten()
.distinct()
.minus(receivedFrom)
.forEach { peer -> sendIdontwant(peer, msg.messageId) }
.minus(setOfNotNull(receivedFrom))
.forEach { sendIdontwant(it, msg.messageId) }
}

private fun enqueuePrune(peer: PeerHandler, topic: Topic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ class GossipParamsBuilder {

private var pruneBackoff: Duration? = null

private var floodPublish: Boolean? = null

private var gossipFactor: Double? = null

private var opportunisticGraftPeers: Int? = null
Expand Down Expand Up @@ -76,6 +74,8 @@ class GossipParamsBuilder {

private var iDontWantMinMessageSizeThreshold: Int? = null

private var floodPublishMaxMessageSizeThreshold: Int? = null

private var iDontWantTTL: Duration? = null

init {
Expand All @@ -90,7 +90,7 @@ class GossipParamsBuilder {
this.maxPeersSentInPruneMsg = source.maxPeersSentInPruneMsg
this.maxPeersAcceptedInPruneMsg = source.maxPeersAcceptedInPruneMsg
this.pruneBackoff = source.pruneBackoff
this.floodPublish = source.floodPublish
this.floodPublishMaxMessageSizeThreshold = source.floodPublishMaxMessageSizeThreshold
this.gossipFactor = source.gossipFactor
this.opportunisticGraftPeers = source.opportunisticGraftPeers
this.opportunisticGraftTicks = source.opportunisticGraftTicks
Expand Down Expand Up @@ -141,8 +141,6 @@ class GossipParamsBuilder {

fun pruneBackoff(value: Duration): GossipParamsBuilder = apply { pruneBackoff = value }

fun floodPublish(value: Boolean): GossipParamsBuilder = apply { floodPublish = value }

fun gossipFactor(value: Double): GossipParamsBuilder = apply { gossipFactor = value }

fun opportunisticGraftPeers(value: Int): GossipParamsBuilder = apply {
Expand Down Expand Up @@ -185,6 +183,8 @@ class GossipParamsBuilder {

fun iDontWantMinMessageSizeThreshold(value: Int): GossipParamsBuilder = apply { iDontWantMinMessageSizeThreshold = value }

fun floodPublishMaxMessageSizeThreshold(value: Int): GossipParamsBuilder = apply { floodPublishMaxMessageSizeThreshold = value }

fun iDontWantTTL(value: Duration): GossipParamsBuilder = apply { iDontWantTTL = value }

fun build(): GossipParams {
Expand All @@ -203,7 +203,7 @@ class GossipParamsBuilder {
gossipHistoryLength = gossipHistoryLength!!,
heartbeatInterval = heartbeatInterval!!,
seenTTL = seenTTL!!,
floodPublish = floodPublish!!,
floodPublishMaxMessageSizeThreshold = floodPublishMaxMessageSizeThreshold!!,
gossipFactor = gossipFactor!!,
opportunisticGraftPeers = opportunisticGraftPeers!!,
opportunisticGraftTicks = opportunisticGraftTicks!!,
Expand Down Expand Up @@ -252,7 +252,7 @@ class GossipParamsBuilder {
check(seenTTL != null, { "seenTTL must not be null" })
check(maxPeersSentInPruneMsg != null, { "maxPeersSentInPruneMsg must not be null" })
check(pruneBackoff != null, { "pruneBackoff must not be null" })
check(floodPublish != null, { "floodPublish must not be null" })
check(floodPublishMaxMessageSizeThreshold != null, { "floodPublishMaxMessageSizeThreshold must not be null" })
check(gossipFactor != null, { "gossipFactor must not be null" })
check(opportunisticGraftPeers != null, { "opportunisticGraftPeers must not be null" })
check(opportunisticGraftTicks != null, { "opportunisticGraftTicks must not be null" })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit

class GossipPubsubRouterTest : PubsubRouterTest(
createGossipFuzzRouterFactory {
GossipRouterBuilder(params = GossipParams(3, 3, 100, floodPublish = false))
GossipRouterBuilder(params = GossipParams(3, 3, 100, floodPublishMaxMessageSizeThreshold = NEVER_FLOOD_PUBLISH))
}
) {

Expand Down Expand Up @@ -59,7 +59,7 @@ class GossipPubsubRouterTest : PubsubRouterTest(
// this is to test ihave/iwant
fuzz.timeController.addTime(Duration.ofMillis(1))

val r = { GossipRouterBuilder(params = GossipParams(3, 3, 3, DOut = 0, DLazy = 1000, floodPublish = false)) }
val r = { GossipRouterBuilder(params = GossipParams(3, 3, 3, DOut = 0, DLazy = 1000, floodPublishMaxMessageSizeThreshold = NEVER_FLOOD_PUBLISH)) }
val routerCenter = fuzz.createTestGossipRouter(r)
allRouters.add(0, routerCenter)

Expand Down
Loading

0 comments on commit c825581

Please sign in to comment.