diff --git a/src/main/kotlin/io/libp2p/core/Libp2pException.kt b/src/main/kotlin/io/libp2p/core/Libp2pException.kt index cae18cd45..5592e65e6 100644 --- a/src/main/kotlin/io/libp2p/core/Libp2pException.kt +++ b/src/main/kotlin/io/libp2p/core/Libp2pException.kt @@ -64,4 +64,10 @@ open class TransportNotSupportedException(message: String) : Libp2pException(mes /** * Indicates the message received from a remote party violates protocol */ -open class ProtocolViolationException(message: String) : Libp2pException(message) \ No newline at end of file +open class ProtocolViolationException(message: String) : Libp2pException(message) + +/** + * When trying to write a message to a peer within [io.libp2p.etc.util.P2PServiceSemiDuplex] + * but there is no yet outbound stream created. + */ +open class SemiDuplexNoOutboundStreamException(message: String) : Libp2pException(message) \ No newline at end of file diff --git a/src/main/kotlin/io/libp2p/etc/types/AsyncExt.kt b/src/main/kotlin/io/libp2p/etc/types/AsyncExt.kt index 6476b647b..1e3c79b08 100644 --- a/src/main/kotlin/io/libp2p/etc/types/AsyncExt.kt +++ b/src/main/kotlin/io/libp2p/etc/types/AsyncExt.kt @@ -62,6 +62,7 @@ fun anyComplete(vararg all: CompletableFuture): CompletableFuture { return if (all.isEmpty()) completedExceptionally(NothingToCompleteException()) else object : CompletableFuture() { init { + val counter = AtomicInteger(all.size) all.forEach { it.whenComplete { v, t -> if (t == null) { complete(v) @@ -70,7 +71,6 @@ fun anyComplete(vararg all: CompletableFuture): CompletableFuture { } } } } - val counter = AtomicInteger(all.size) } } diff --git a/src/main/kotlin/io/libp2p/etc/util/P2PServiceSemiDuplex.kt b/src/main/kotlin/io/libp2p/etc/util/P2PServiceSemiDuplex.kt index 9b799d85e..e707cb4da 100644 --- a/src/main/kotlin/io/libp2p/etc/util/P2PServiceSemiDuplex.kt +++ b/src/main/kotlin/io/libp2p/etc/util/P2PServiceSemiDuplex.kt @@ -1,7 +1,8 @@ package io.libp2p.etc.util import io.libp2p.core.BadPeerException -import io.libp2p.core.InternalErrorException +import io.libp2p.core.SemiDuplexNoOutboundStreamException +import io.libp2p.etc.types.completedExceptionally import io.libp2p.etc.types.toVoidCompletableFuture import java.util.concurrent.CompletableFuture @@ -17,7 +18,8 @@ abstract class P2PServiceSemiDuplex : P2PService() { var otherStreamHandler: StreamHandler? = null override fun writeAndFlush(msg: Any): CompletableFuture = - getOutboundHandler()?.ctx?.writeAndFlush(msg)?.toVoidCompletableFuture() ?: throw InternalErrorException("No active outbound stream to write data $msg") + getOutboundHandler()?.ctx?.writeAndFlush(msg)?.toVoidCompletableFuture() ?: completedExceptionally( + SemiDuplexNoOutboundStreamException("No active outbound stream to write data $msg")) override fun isActive() = getOutboundHandler()?.ctx != null diff --git a/src/main/kotlin/io/libp2p/pubsub/Errors.kt b/src/main/kotlin/io/libp2p/pubsub/Errors.kt index 1f9bb5b2f..c750f2494 100644 --- a/src/main/kotlin/io/libp2p/pubsub/Errors.kt +++ b/src/main/kotlin/io/libp2p/pubsub/Errors.kt @@ -16,5 +16,3 @@ class MessageAlreadySeenException(message: String) : PubsubException(message) * Throw when message validation failed */ class InvalidMessageException(message: String) : PubsubException(message) - -class InternalError \ No newline at end of file diff --git a/src/test/kotlin/io/libp2p/etc/types/AsyncExtTest.kt b/src/test/kotlin/io/libp2p/etc/types/AsyncExtTest.kt index 1538099e4..bb95965d4 100644 --- a/src/test/kotlin/io/libp2p/etc/types/AsyncExtTest.kt +++ b/src/test/kotlin/io/libp2p/etc/types/AsyncExtTest.kt @@ -1,5 +1,6 @@ package io.libp2p.etc.types +import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test import java.util.concurrent.CompletableFuture @@ -18,4 +19,11 @@ class AsyncExtTest { assert(allFut.isDone) assert(allFut.get() == 6) } + + @Test + fun testAnyCompleteWithCompletedExceptionally() { + val anyComplete = anyComplete(completedExceptionally(RuntimeException("test"))) + + Assertions.assertTrue(anyComplete.isCompletedExceptionally) + } } \ No newline at end of file diff --git a/src/test/kotlin/io/libp2p/pubsub/gossip/GossipPubsubRouterTest.kt b/src/test/kotlin/io/libp2p/pubsub/gossip/GossipPubsubRouterTest.kt index c9c430dee..6adf43b0b 100644 --- a/src/test/kotlin/io/libp2p/pubsub/gossip/GossipPubsubRouterTest.kt +++ b/src/test/kotlin/io/libp2p/pubsub/gossip/GossipPubsubRouterTest.kt @@ -2,12 +2,16 @@ package io.libp2p.pubsub.gossip import io.libp2p.etc.types.seconds import io.libp2p.pubsub.DeterministicFuzz +import io.libp2p.pubsub.MockRouter import io.libp2p.pubsub.PubsubRouterTest import io.libp2p.pubsub.TestRouter +import io.libp2p.tools.TestLogAppender import io.netty.handler.logging.LogLevel import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test +import pubsub.pb.Rpc import java.time.Duration +import java.util.concurrent.TimeUnit class GossipPubsubRouterTest : PubsubRouterTest({ GossipRouter( @@ -100,4 +104,72 @@ class GossipPubsubRouterTest : PubsubRouterTest({ Assertions.assertEquals(receiveRouters.size, msgCount3) receiveRouters.forEach { it.inboundMessages.clear() } } + + @Test + fun testOneWayConnect() { + // when remote gossip makes connection and immediately send IHAVE + // the situation when we fail to send IWANT (as not outbound stream yet) + // shouldn't be treated as internal error and no WARN logs should be printed + val fuzz = DeterministicFuzz() + + val router1 = fuzz.createTestRouter(MockRouter()) + val router2 = fuzz.createTestRouter(router()) + val mockRouter = router1.router as MockRouter + + router2.router.subscribe("topic1") + router1.connect(router2, LogLevel.INFO, LogLevel.INFO) + + TestLogAppender().install().use { testLogAppender -> + val msg1 = Rpc.RPC.newBuilder() + .setControl( + Rpc.ControlMessage.newBuilder().addIhave( + Rpc.ControlIHave.newBuilder().addMessageIDs("messageId") + ) + ).build() + + mockRouter.sendToSingle(msg1) + + Assertions.assertFalse(testLogAppender.hasAnyWarns()) + } + } + + @Test + fun testOneWayConnectPublish() { + // check that the published message is broadcasted successfully when one + // of gossip peers is yet 'partially' connected + val fuzz = DeterministicFuzz() + + val router1 = fuzz.createTestRouter(MockRouter()) + val router2 = fuzz.createTestRouter(router()) + val router3 = fuzz.createTestRouter(router()) + val mockRouter = router1.router as MockRouter + + router2.router.subscribe("topic1") + router3.router.subscribe("topic1") + router1.connect(router2, LogLevel.INFO, LogLevel.INFO) + router2.connectSemiDuplex(router3, LogLevel.INFO, LogLevel.INFO) + + TestLogAppender().install().use { testLogAppender -> + + val msg1 = Rpc.RPC.newBuilder() + .addSubscriptions(Rpc.RPC.SubOpts.newBuilder() + .setTopicid("topic1") + .setSubscribe(true)) + .setControl( + Rpc.ControlMessage.newBuilder().addGraft( + Rpc.ControlGraft.newBuilder().setTopicID("topic1") + )) + .build() + mockRouter.sendToSingle(msg1) + + fuzz.timeController.addTime(3.seconds) + + val msg2 = newMessage("topic1", 1L, "Hello".toByteArray()) + val future = router2.router.publish(msg2) + Assertions.assertDoesNotThrow { future.get(1, TimeUnit.SECONDS) } + Assertions.assertEquals(1, router3.inboundMessages.size) + + Assertions.assertFalse(testLogAppender.hasAnyWarns()) + } + } } diff --git a/src/test/kotlin/io/libp2p/tools/TestLogAppender.kt b/src/test/kotlin/io/libp2p/tools/TestLogAppender.kt new file mode 100644 index 000000000..2e70fff1f --- /dev/null +++ b/src/test/kotlin/io/libp2p/tools/TestLogAppender.kt @@ -0,0 +1,32 @@ +package io.libp2p.tools + +import org.apache.logging.log4j.Level +import org.apache.logging.log4j.LogManager +import org.apache.logging.log4j.core.LogEvent +import org.apache.logging.log4j.core.Logger +import org.apache.logging.log4j.core.appender.AbstractAppender +import java.util.ArrayList + +class TestLogAppender : AbstractAppender("test", null, null), AutoCloseable { + val logs: MutableList = ArrayList() + + fun install(): TestLogAppender { + (LogManager.getRootLogger() as Logger).addAppender(this) + return this + } + + fun uninstall() { + (LogManager.getRootLogger() as Logger).removeAppender(this) + } + + override fun close() { + uninstall() + } + + fun hasAny(level: Level) = logs.any { it.level == level } + fun hasAnyWarns() = hasAny(Level.ERROR) || hasAny(Level.WARN) + + override fun append(event: LogEvent) { + logs += event.toImmutable() + } +} \ No newline at end of file