Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the panic when SemiDuplex has no outbound stream yet #136

Merged
merged 3 commits into from
Aug 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/main/kotlin/io/libp2p/core/Libp2pException.kt
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,10 @@ open class TransportNotSupportedException(message: String) : Libp2pException(mes
/**
* Indicates the message received from a remote party violates protocol
*/
open class ProtocolViolationException(message: String) : Libp2pException(message)
open class ProtocolViolationException(message: String) : Libp2pException(message)

/**
* When trying to write a message to a peer within [io.libp2p.etc.util.P2PServiceSemiDuplex]
* but there is no yet outbound stream created.
*/
open class SemiDuplexNoOutboundStreamException(message: String) : Libp2pException(message)
2 changes: 1 addition & 1 deletion src/main/kotlin/io/libp2p/etc/types/AsyncExt.kt
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ fun <C> anyComplete(vararg all: CompletableFuture<C>): CompletableFuture<C> {
return if (all.isEmpty()) completedExceptionally(NothingToCompleteException())
else object : CompletableFuture<C>() {
init {
val counter = AtomicInteger(all.size)
all.forEach { it.whenComplete { v, t ->
if (t == null) {
complete(v)
Expand All @@ -70,7 +71,6 @@ fun <C> anyComplete(vararg all: CompletableFuture<C>): CompletableFuture<C> {
}
} }
}
val counter = AtomicInteger(all.size)
}
}

Expand Down
6 changes: 4 additions & 2 deletions src/main/kotlin/io/libp2p/etc/util/P2PServiceSemiDuplex.kt
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package io.libp2p.etc.util

import io.libp2p.core.BadPeerException
import io.libp2p.core.InternalErrorException
import io.libp2p.core.SemiDuplexNoOutboundStreamException
import io.libp2p.etc.types.completedExceptionally
import io.libp2p.etc.types.toVoidCompletableFuture
import java.util.concurrent.CompletableFuture

Expand All @@ -17,7 +18,8 @@ abstract class P2PServiceSemiDuplex : P2PService() {
var otherStreamHandler: StreamHandler? = null

override fun writeAndFlush(msg: Any): CompletableFuture<Unit> =
getOutboundHandler()?.ctx?.writeAndFlush(msg)?.toVoidCompletableFuture() ?: throw InternalErrorException("No active outbound stream to write data $msg")
getOutboundHandler()?.ctx?.writeAndFlush(msg)?.toVoidCompletableFuture() ?: completedExceptionally(
SemiDuplexNoOutboundStreamException("No active outbound stream to write data $msg"))

override fun isActive() = getOutboundHandler()?.ctx != null

Expand Down
2 changes: 0 additions & 2 deletions src/main/kotlin/io/libp2p/pubsub/Errors.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,3 @@ class MessageAlreadySeenException(message: String) : PubsubException(message)
* Throw when message validation failed
*/
class InvalidMessageException(message: String) : PubsubException(message)

class InternalError
8 changes: 8 additions & 0 deletions src/test/kotlin/io/libp2p/etc/types/AsyncExtTest.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.libp2p.etc.types

import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
import java.util.concurrent.CompletableFuture

Expand All @@ -18,4 +19,11 @@ class AsyncExtTest {
assert(allFut.isDone)
assert(allFut.get() == 6)
}

@Test
fun testAnyCompleteWithCompletedExceptionally() {
val anyComplete = anyComplete<Int>(completedExceptionally(RuntimeException("test")))

Assertions.assertTrue(anyComplete.isCompletedExceptionally)
}
}
72 changes: 72 additions & 0 deletions src/test/kotlin/io/libp2p/pubsub/gossip/GossipPubsubRouterTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ package io.libp2p.pubsub.gossip

import io.libp2p.etc.types.seconds
import io.libp2p.pubsub.DeterministicFuzz
import io.libp2p.pubsub.MockRouter
import io.libp2p.pubsub.PubsubRouterTest
import io.libp2p.pubsub.TestRouter
import io.libp2p.tools.TestLogAppender
import io.netty.handler.logging.LogLevel
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
import pubsub.pb.Rpc
import java.time.Duration
import java.util.concurrent.TimeUnit

class GossipPubsubRouterTest : PubsubRouterTest({
GossipRouter(
Expand Down Expand Up @@ -100,4 +104,72 @@ class GossipPubsubRouterTest : PubsubRouterTest({
Assertions.assertEquals(receiveRouters.size, msgCount3)
receiveRouters.forEach { it.inboundMessages.clear() }
}

@Test
fun testOneWayConnect() {
// when remote gossip makes connection and immediately send IHAVE
// the situation when we fail to send IWANT (as not outbound stream yet)
// shouldn't be treated as internal error and no WARN logs should be printed
val fuzz = DeterministicFuzz()

val router1 = fuzz.createTestRouter(MockRouter())
val router2 = fuzz.createTestRouter(router())
val mockRouter = router1.router as MockRouter

router2.router.subscribe("topic1")
router1.connect(router2, LogLevel.INFO, LogLevel.INFO)

TestLogAppender().install().use { testLogAppender ->
val msg1 = Rpc.RPC.newBuilder()
.setControl(
Rpc.ControlMessage.newBuilder().addIhave(
Rpc.ControlIHave.newBuilder().addMessageIDs("messageId")
)
).build()

mockRouter.sendToSingle(msg1)

Assertions.assertFalse(testLogAppender.hasAnyWarns())
}
}

@Test
fun testOneWayConnectPublish() {
// check that the published message is broadcasted successfully when one
// of gossip peers is yet 'partially' connected
val fuzz = DeterministicFuzz()

val router1 = fuzz.createTestRouter(MockRouter())
val router2 = fuzz.createTestRouter(router())
val router3 = fuzz.createTestRouter(router())
val mockRouter = router1.router as MockRouter

router2.router.subscribe("topic1")
router3.router.subscribe("topic1")
router1.connect(router2, LogLevel.INFO, LogLevel.INFO)
router2.connectSemiDuplex(router3, LogLevel.INFO, LogLevel.INFO)

TestLogAppender().install().use { testLogAppender ->

val msg1 = Rpc.RPC.newBuilder()
.addSubscriptions(Rpc.RPC.SubOpts.newBuilder()
.setTopicid("topic1")
.setSubscribe(true))
.setControl(
Rpc.ControlMessage.newBuilder().addGraft(
Rpc.ControlGraft.newBuilder().setTopicID("topic1")
))
.build()
mockRouter.sendToSingle(msg1)

fuzz.timeController.addTime(3.seconds)

val msg2 = newMessage("topic1", 1L, "Hello".toByteArray())
val future = router2.router.publish(msg2)
Assertions.assertDoesNotThrow { future.get(1, TimeUnit.SECONDS) }
Assertions.assertEquals(1, router3.inboundMessages.size)

Assertions.assertFalse(testLogAppender.hasAnyWarns())
}
}
}
32 changes: 32 additions & 0 deletions src/test/kotlin/io/libp2p/tools/TestLogAppender.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.libp2p.tools

import org.apache.logging.log4j.Level
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.core.LogEvent
import org.apache.logging.log4j.core.Logger
import org.apache.logging.log4j.core.appender.AbstractAppender
import java.util.ArrayList

class TestLogAppender : AbstractAppender("test", null, null), AutoCloseable {
val logs: MutableList<LogEvent> = ArrayList()

fun install(): TestLogAppender {
(LogManager.getRootLogger() as Logger).addAppender(this)
return this
}

fun uninstall() {
(LogManager.getRootLogger() as Logger).removeAppender(this)
}

override fun close() {
uninstall()
}

fun hasAny(level: Level) = logs.any { it.level == level }
fun hasAnyWarns() = hasAny(Level.ERROR) || hasAny(Level.WARN)

override fun append(event: LogEvent) {
logs += event.toImmutable()
}
}