Skip to content

Commit

Permalink
Merge pull request #137 from libp2p/0.5.6
Browse files Browse the repository at this point in the history
Release 0.5.6
#136 (Fix the panic when SemiDuplex has no outbound stream yet)
  • Loading branch information
Nashatyrev authored Sep 1, 2020
2 parents 3fd9dd9 + 152abde commit 8a6d677
Show file tree
Hide file tree
Showing 9 changed files with 126 additions and 8 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ Builds are published to JCenter. Maven Central mirrors JCenter, but updates can
<dependency>
<groupId>io.libp2p</groupId>
<artifactId>jvm-libp2p-minimal</artifactId>
<version>0.5.5-RELEASE</version>
<version>0.5.6-RELEASE</version>
<type>pom</type>
</dependency>
```
Expand Down
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import java.nio.file.Paths
// ./gradlew bintrayUpload -PbintrayUser=<user> -PbintrayApiKey=<api-key>

group = "io.libp2p"
version = "0.5.5-RELEASE"
version = "0.5.6-RELEASE"
description = "a minimal implementation of libp2p for the jvm"

plugins {
Expand Down
8 changes: 7 additions & 1 deletion src/main/kotlin/io/libp2p/core/Libp2pException.kt
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,10 @@ open class TransportNotSupportedException(message: String) : Libp2pException(mes
/**
* Indicates the message received from a remote party violates protocol
*/
open class ProtocolViolationException(message: String) : Libp2pException(message)
open class ProtocolViolationException(message: String) : Libp2pException(message)

/**
* When trying to write a message to a peer within [io.libp2p.etc.util.P2PServiceSemiDuplex]
* but there is no yet outbound stream created.
*/
open class SemiDuplexNoOutboundStreamException(message: String) : Libp2pException(message)
2 changes: 1 addition & 1 deletion src/main/kotlin/io/libp2p/etc/types/AsyncExt.kt
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ fun <C> anyComplete(vararg all: CompletableFuture<C>): CompletableFuture<C> {
return if (all.isEmpty()) completedExceptionally(NothingToCompleteException())
else object : CompletableFuture<C>() {
init {
val counter = AtomicInteger(all.size)
all.forEach { it.whenComplete { v, t ->
if (t == null) {
complete(v)
Expand All @@ -70,7 +71,6 @@ fun <C> anyComplete(vararg all: CompletableFuture<C>): CompletableFuture<C> {
}
} }
}
val counter = AtomicInteger(all.size)
}
}

Expand Down
6 changes: 4 additions & 2 deletions src/main/kotlin/io/libp2p/etc/util/P2PServiceSemiDuplex.kt
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package io.libp2p.etc.util

import io.libp2p.core.BadPeerException
import io.libp2p.core.InternalErrorException
import io.libp2p.core.SemiDuplexNoOutboundStreamException
import io.libp2p.etc.types.completedExceptionally
import io.libp2p.etc.types.toVoidCompletableFuture
import java.util.concurrent.CompletableFuture

Expand All @@ -17,7 +18,8 @@ abstract class P2PServiceSemiDuplex : P2PService() {
var otherStreamHandler: StreamHandler? = null

override fun writeAndFlush(msg: Any): CompletableFuture<Unit> =
getOutboundHandler()?.ctx?.writeAndFlush(msg)?.toVoidCompletableFuture() ?: throw InternalErrorException("No active outbound stream to write data $msg")
getOutboundHandler()?.ctx?.writeAndFlush(msg)?.toVoidCompletableFuture() ?: completedExceptionally(
SemiDuplexNoOutboundStreamException("No active outbound stream to write data $msg"))

override fun isActive() = getOutboundHandler()?.ctx != null

Expand Down
2 changes: 0 additions & 2 deletions src/main/kotlin/io/libp2p/pubsub/Errors.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,3 @@ class MessageAlreadySeenException(message: String) : PubsubException(message)
* Throw when message validation failed
*/
class InvalidMessageException(message: String) : PubsubException(message)

class InternalError
8 changes: 8 additions & 0 deletions src/test/kotlin/io/libp2p/etc/types/AsyncExtTest.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.libp2p.etc.types

import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
import java.util.concurrent.CompletableFuture

Expand All @@ -18,4 +19,11 @@ class AsyncExtTest {
assert(allFut.isDone)
assert(allFut.get() == 6)
}

@Test
fun testAnyCompleteWithCompletedExceptionally() {
val anyComplete = anyComplete<Int>(completedExceptionally(RuntimeException("test")))

Assertions.assertTrue(anyComplete.isCompletedExceptionally)
}
}
72 changes: 72 additions & 0 deletions src/test/kotlin/io/libp2p/pubsub/gossip/GossipPubsubRouterTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ package io.libp2p.pubsub.gossip

import io.libp2p.etc.types.seconds
import io.libp2p.pubsub.DeterministicFuzz
import io.libp2p.pubsub.MockRouter
import io.libp2p.pubsub.PubsubRouterTest
import io.libp2p.pubsub.TestRouter
import io.libp2p.tools.TestLogAppender
import io.netty.handler.logging.LogLevel
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
import pubsub.pb.Rpc
import java.time.Duration
import java.util.concurrent.TimeUnit

class GossipPubsubRouterTest : PubsubRouterTest({
GossipRouter(
Expand Down Expand Up @@ -100,4 +104,72 @@ class GossipPubsubRouterTest : PubsubRouterTest({
Assertions.assertEquals(receiveRouters.size, msgCount3)
receiveRouters.forEach { it.inboundMessages.clear() }
}

@Test
fun testOneWayConnect() {
// when remote gossip makes connection and immediately send IHAVE
// the situation when we fail to send IWANT (as not outbound stream yet)
// shouldn't be treated as internal error and no WARN logs should be printed
val fuzz = DeterministicFuzz()

val router1 = fuzz.createTestRouter(MockRouter())
val router2 = fuzz.createTestRouter(router())
val mockRouter = router1.router as MockRouter

router2.router.subscribe("topic1")
router1.connect(router2, LogLevel.INFO, LogLevel.INFO)

TestLogAppender().install().use { testLogAppender ->
val msg1 = Rpc.RPC.newBuilder()
.setControl(
Rpc.ControlMessage.newBuilder().addIhave(
Rpc.ControlIHave.newBuilder().addMessageIDs("messageId")
)
).build()

mockRouter.sendToSingle(msg1)

Assertions.assertFalse(testLogAppender.hasAnyWarns())
}
}

@Test
fun testOneWayConnectPublish() {
// check that the published message is broadcasted successfully when one
// of gossip peers is yet 'partially' connected
val fuzz = DeterministicFuzz()

val router1 = fuzz.createTestRouter(MockRouter())
val router2 = fuzz.createTestRouter(router())
val router3 = fuzz.createTestRouter(router())
val mockRouter = router1.router as MockRouter

router2.router.subscribe("topic1")
router3.router.subscribe("topic1")
router1.connect(router2, LogLevel.INFO, LogLevel.INFO)
router2.connectSemiDuplex(router3, LogLevel.INFO, LogLevel.INFO)

TestLogAppender().install().use { testLogAppender ->

val msg1 = Rpc.RPC.newBuilder()
.addSubscriptions(Rpc.RPC.SubOpts.newBuilder()
.setTopicid("topic1")
.setSubscribe(true))
.setControl(
Rpc.ControlMessage.newBuilder().addGraft(
Rpc.ControlGraft.newBuilder().setTopicID("topic1")
))
.build()
mockRouter.sendToSingle(msg1)

fuzz.timeController.addTime(3.seconds)

val msg2 = newMessage("topic1", 1L, "Hello".toByteArray())
val future = router2.router.publish(msg2)
Assertions.assertDoesNotThrow { future.get(1, TimeUnit.SECONDS) }
Assertions.assertEquals(1, router3.inboundMessages.size)

Assertions.assertFalse(testLogAppender.hasAnyWarns())
}
}
}
32 changes: 32 additions & 0 deletions src/test/kotlin/io/libp2p/tools/TestLogAppender.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.libp2p.tools

import org.apache.logging.log4j.Level
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.core.LogEvent
import org.apache.logging.log4j.core.Logger
import org.apache.logging.log4j.core.appender.AbstractAppender
import java.util.ArrayList

class TestLogAppender : AbstractAppender("test", null, null), AutoCloseable {
val logs: MutableList<LogEvent> = ArrayList()

fun install(): TestLogAppender {
(LogManager.getRootLogger() as Logger).addAppender(this)
return this
}

fun uninstall() {
(LogManager.getRootLogger() as Logger).removeAppender(this)
}

override fun close() {
uninstall()
}

fun hasAny(level: Level) = logs.any { it.level == level }
fun hasAnyWarns() = hasAny(Level.ERROR) || hasAny(Level.WARN)

override fun append(event: LogEvent) {
logs += event.toImmutable()
}
}

0 comments on commit 8a6d677

Please sign in to comment.