diff --git a/libp2p/src/main/kotlin/io/libp2p/etc/util/netty/mux/MuxId.kt b/libp2p/src/main/kotlin/io/libp2p/etc/util/netty/mux/MuxId.kt index 0d7051d9..25bb66f2 100644 --- a/libp2p/src/main/kotlin/io/libp2p/etc/util/netty/mux/MuxId.kt +++ b/libp2p/src/main/kotlin/io/libp2p/etc/util/netty/mux/MuxId.kt @@ -3,8 +3,8 @@ package io.libp2p.etc.util.netty.mux import io.netty.channel.ChannelId data class MuxId(val parentId: ChannelId, val id: Long, val initiator: Boolean) : ChannelId { - override fun asShortText() = "$parentId/$id/$initiator" - override fun asLongText() = asShortText() + override fun asShortText() = "${parentId.asShortText()}/$id/$initiator" + override fun asLongText() = "${parentId.asLongText()}/$id/$initiator" override fun compareTo(other: ChannelId?) = asShortText().compareTo(other?.asShortText() ?: "") override fun toString() = asLongText() } diff --git a/libp2p/src/main/kotlin/io/libp2p/mux/yamux/YamuxFrame.kt b/libp2p/src/main/kotlin/io/libp2p/mux/yamux/YamuxFrame.kt index fefdf1ae..32bd32e6 100644 --- a/libp2p/src/main/kotlin/io/libp2p/mux/yamux/YamuxFrame.kt +++ b/libp2p/src/main/kotlin/io/libp2p/mux/yamux/YamuxFrame.kt @@ -8,16 +8,18 @@ import io.netty.buffer.Unpooled /** * Contains the fields that comprise a yamux frame. - * @param streamId the ID of the stream. - * @param flag the flag value for this frame. + * @param id the ID of the stream. + * @param flags the flags value for this frame. + * @param length the length field for this frame. * @param data the data segment. */ -class YamuxFrame(val id: MuxId, val type: Int, val flags: Int, val lenData: Long, val data: ByteBuf? = null) : +class YamuxFrame(val id: MuxId, val type: Int, val flags: Int, val length: Long, val data: ByteBuf? = null) : DefaultByteBufHolder(data ?: Unpooled.EMPTY_BUFFER) { override fun toString(): String { - if (data == null) - return "YamuxFrame(id=$id, type=$type, flag=$flags)" - return "YamuxFrame(id=$id, type=$type, flag=$flags, data=${String(data.toByteArray())})" + if (data == null) { + return "YamuxFrame(id=$id, type=$type, flags=$flags, length=$length)" + } + return "YamuxFrame(id=$id, type=$type, flags=$flags, length=$length, data=${String(data.toByteArray())})" } } diff --git a/libp2p/src/main/kotlin/io/libp2p/mux/yamux/YamuxFrameCodec.kt b/libp2p/src/main/kotlin/io/libp2p/mux/yamux/YamuxFrameCodec.kt index d21fb2d4..d8a8e267 100644 --- a/libp2p/src/main/kotlin/io/libp2p/mux/yamux/YamuxFrameCodec.kt +++ b/libp2p/src/main/kotlin/io/libp2p/mux/yamux/YamuxFrameCodec.kt @@ -29,7 +29,7 @@ class YamuxFrameCodec( out.writeByte(msg.type) out.writeShort(msg.flags) out.writeInt(msg.id.id.toInt()) - out.writeInt(msg.data?.readableBytes() ?: msg.lenData.toInt()) + out.writeInt(msg.data?.readableBytes() ?: msg.length.toInt()) out.writeBytes(msg.data ?: Unpooled.EMPTY_BUFFER) } @@ -42,32 +42,44 @@ class YamuxFrameCodec( */ override fun decode(ctx: ChannelHandlerContext, msg: ByteBuf, out: MutableList) { while (msg.isReadable) { - if (msg.readableBytes() < 12) + if (msg.readableBytes() < 12) { return + } val readerIndex = msg.readerIndex() msg.readByte(); // version always 0 val type = msg.readUnsignedByte() val flags = msg.readUnsignedShort() val streamId = msg.readUnsignedInt() - val lenData = msg.readUnsignedInt() + val length = msg.readUnsignedInt() if (type.toInt() != YamuxType.DATA) { - val yamuxFrame = YamuxFrame(MuxId(ctx.channel().id(), streamId, isInitiator.xor(streamId.mod(2).equals(1)).not()), type.toInt(), flags, lenData) + val yamuxFrame = YamuxFrame( + MuxId(ctx.channel().id(), streamId, isInitiator.xor(streamId.mod(2) == 1).not()), + type.toInt(), + flags, + length + ) out.add(yamuxFrame) continue } - if (lenData > maxFrameDataLength) { + if (length > maxFrameDataLength) { msg.skipBytes(msg.readableBytes()) - throw ProtocolViolationException("Yamux frame is too large: $lenData") + throw ProtocolViolationException("Yamux frame is too large: $length") } - if (msg.readableBytes() < lenData) { + if (msg.readableBytes() < length) { // not enough data to read the frame content // will wait for more ... msg.readerIndex(readerIndex) return } - val data = msg.readSlice(lenData.toInt()) + val data = msg.readSlice(length.toInt()) data.retain() // MessageToMessageCodec releases original buffer, but it needs to be relayed - val yamuxFrame = YamuxFrame(MuxId(ctx.channel().id(), streamId, isInitiator.xor(streamId.mod(2).equals(1)).not()), type.toInt(), flags, lenData, data) + val yamuxFrame = YamuxFrame( + MuxId(ctx.channel().id(), streamId, isInitiator.xor(streamId.mod(2) == 1).not()), + type.toInt(), + flags, + length, + data + ) out.add(yamuxFrame) } } diff --git a/libp2p/src/main/kotlin/io/libp2p/mux/yamux/YamuxHandler.kt b/libp2p/src/main/kotlin/io/libp2p/mux/yamux/YamuxHandler.kt index 887745c0..072051b7 100644 --- a/libp2p/src/main/kotlin/io/libp2p/mux/yamux/YamuxHandler.kt +++ b/libp2p/src/main/kotlin/io/libp2p/mux/yamux/YamuxHandler.kt @@ -10,6 +10,7 @@ import io.libp2p.etc.util.netty.mux.MuxId import io.libp2p.mux.MuxHandler import io.netty.buffer.ByteBuf import io.netty.channel.ChannelHandlerContext +import org.slf4j.LoggerFactory import java.util.concurrent.CompletableFuture import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicInteger @@ -17,6 +18,8 @@ import java.util.concurrent.atomic.AtomicInteger const val INITIAL_WINDOW_SIZE = 256 * 1024 const val MAX_BUFFERED_CONNECTION_WRITES = 1024 * 1024 +private val log = LoggerFactory.getLogger(YamuxHandler::class.java) + open class YamuxHandler( override val multistreamProtocol: MultistreamProtocol, override val maxFrameDataLength: Int, @@ -39,7 +42,7 @@ open class YamuxHandler( fun flush(sendWindow: AtomicInteger, id: MuxId): Int { var written = 0 - while (! buffered.isEmpty()) { + while (!buffered.isEmpty()) { val buf = buffered.first() val readableBytes = buf.readableBytes() if (readableBytes + written < sendWindow.get()) { @@ -65,19 +68,27 @@ open class YamuxHandler( YamuxType.DATA -> handleDataRead(msg) YamuxType.WINDOW_UPDATE -> handleWindowUpdate(msg) YamuxType.PING -> handlePing(msg) - YamuxType.GO_AWAY -> onRemoteClose(msg.id) + YamuxType.GO_AWAY -> handleGoAway(msg) } } - fun handlePing(msg: YamuxFrame) { + private fun handlePing(msg: YamuxFrame) { val ctx = getChannelHandlerContext() when (msg.flags) { - YamuxFlags.SYN -> ctx.writeAndFlush(YamuxFrame(MuxId(msg.id.parentId, 0, msg.id.initiator), YamuxType.PING, YamuxFlags.ACK, msg.lenData)) + YamuxFlags.SYN -> ctx.writeAndFlush( + YamuxFrame( + MuxId(msg.id.parentId, 0, msg.id.initiator), + YamuxType.PING, + YamuxFlags.ACK, + msg.length + ) + ) + YamuxFlags.ACK -> {} } } - fun handleFlags(msg: YamuxFrame) { + private fun handleFlags(msg: YamuxFrame) { val ctx = getChannelHandlerContext() when (msg.flags) { YamuxFlags.SYN -> { @@ -85,18 +96,20 @@ open class YamuxHandler( onRemoteYamuxOpen(msg.id) ctx.writeAndFlush(YamuxFrame(msg.id, YamuxType.WINDOW_UPDATE, YamuxFlags.ACK, 0)) } + YamuxFlags.FIN -> onRemoteDisconnect(msg.id) YamuxFlags.RST -> onRemoteClose(msg.id) } } - fun handleDataRead(msg: YamuxFrame) { + private fun handleDataRead(msg: YamuxFrame) { val ctx = getChannelHandlerContext() - val size = msg.lenData + val size = msg.length handleFlags(msg) - if (size.toInt() == 0) + if (size.toInt() == 0) { return - val recWindow = receiveWindows.get(msg.id) + } + val recWindow = receiveWindows[msg.id] if (recWindow == null) { releaseMessage(msg.data!!) throw Libp2pException("No receive window for " + msg.id) @@ -111,36 +124,38 @@ open class YamuxHandler( childRead(msg.id, msg.data!!) } - fun handleWindowUpdate(msg: YamuxFrame) { + private fun handleWindowUpdate(msg: YamuxFrame) { handleFlags(msg) - val size = msg.lenData.toInt() - if (size == 0) - return - val sendWindow = sendWindows.get(msg.id) - if (sendWindow == null) { + val size = msg.length.toInt() + if (size == 0) { return } + val sendWindow = sendWindows[msg.id] ?: return sendWindow.addAndGet(size) - val buffer = sendBuffers.get(msg.id) + val buffer = sendBuffers[msg.id] if (buffer != null) { val writtenBytes = buffer.flush(sendWindow, msg.id) totalBufferedWrites.addAndGet(-writtenBytes) } } + private fun handleGoAway(msg: YamuxFrame) { + log.debug("Session will be terminated. Go Away message with with error code ${msg.length} has been received.") + onRemoteClose(msg.id) + } + override fun onChildWrite(child: MuxChannel, data: ByteBuf) { val ctx = getChannelHandlerContext() - val sendWindow = sendWindows.get(child.id) - if (sendWindow == null) { - throw Libp2pException("No send window for " + child.id) - } + val sendWindow = sendWindows[child.id] ?: throw Libp2pException("No send window for " + child.id) + if (sendWindow.get() <= 0) { // wait until the window is increased to send more data - val buffer = sendBuffers.getOrPut(child.id, { SendBuffer(ctx) }) + val buffer = sendBuffers.getOrPut(child.id) { SendBuffer(ctx) } buffer.add(data) - if (totalBufferedWrites.addAndGet(data.readableBytes()) > MAX_BUFFERED_CONNECTION_WRITES) + if (totalBufferedWrites.addAndGet(data.readableBytes()) > MAX_BUFFERED_CONNECTION_WRITES) { throw Libp2pException("Overflowed send buffer for connection") + } return } sendBlocks(ctx, data, sendWindow, child.id) diff --git a/libp2p/src/main/kotlin/io/libp2p/mux/yamux/YamuxType.kt b/libp2p/src/main/kotlin/io/libp2p/mux/yamux/YamuxType.kt index cf66f4b8..0746c8cf 100644 --- a/libp2p/src/main/kotlin/io/libp2p/mux/yamux/YamuxType.kt +++ b/libp2p/src/main/kotlin/io/libp2p/mux/yamux/YamuxType.kt @@ -1,7 +1,7 @@ package io.libp2p.mux.yamux /** - * Contains all the permissible values for flags in the yamux protocol. + * Contains all the permissible values for types in the yamux protocol. */ object YamuxType { const val DATA = 0 diff --git a/libp2p/src/test/kotlin/io/libp2p/mux/MuxHandlerAbstractTest.kt b/libp2p/src/test/kotlin/io/libp2p/mux/MuxHandlerAbstractTest.kt index 39478f28..83792b55 100644 --- a/libp2p/src/test/kotlin/io/libp2p/mux/MuxHandlerAbstractTest.kt +++ b/libp2p/src/test/kotlin/io/libp2p/mux/MuxHandlerAbstractTest.kt @@ -6,9 +6,11 @@ import io.libp2p.core.StreamHandler import io.libp2p.etc.types.fromHex import io.libp2p.etc.types.getX import io.libp2p.etc.types.toHex +import io.libp2p.etc.util.netty.mux.MuxId import io.libp2p.etc.util.netty.mux.RemoteWriteClosed import io.libp2p.etc.util.netty.nettyInitializer import io.libp2p.mux.MuxHandlerAbstractTest.AbstractTestMuxFrame.Flag.* +import io.libp2p.mux.MuxHandlerAbstractTest.TestEventHandler import io.libp2p.tools.TestChannel import io.libp2p.tools.readAllBytesAndRelease import io.netty.buffer.ByteBuf @@ -20,10 +22,7 @@ import io.netty.handler.logging.LoggingHandler import org.assertj.core.api.Assertions.assertThat import org.assertj.core.data.Index import org.junit.jupiter.api.AfterEach -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Assertions.assertFalse -import org.junit.jupiter.api.Assertions.assertThrows -import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import java.util.concurrent.CompletableFuture @@ -95,10 +94,11 @@ abstract class MuxHandlerAbstractTest { enum class Flag { Open, Data, Close, Reset } } + fun Long.toMuxId() = MuxId(parentChannelId, this, true) + abstract fun writeFrame(frame: AbstractTestMuxFrame) abstract fun readFrame(): AbstractTestMuxFrame? fun readFrameOrThrow() = readFrame() ?: throw AssertionError("No outbound frames") - fun openStream(id: Long) = writeFrame(AbstractTestMuxFrame(id, Open)) fun writeStream(id: Long, msg: String) = writeFrame(AbstractTestMuxFrame(id, Data, msg)) fun closeStream(id: Long) = writeFrame(AbstractTestMuxFrame(id, Close)) @@ -478,7 +478,7 @@ abstract class MuxHandlerAbstractTest { override fun handlerAdded(ctx: ChannelHandlerContext) { assertFalse(isHandlerAdded) isHandlerAdded = true - println("MultiplexHandlerTest.handlerAdded") + println("MuxHandlerAbstractTest.handlerAdded") this.ctx = ctx } @@ -486,58 +486,58 @@ abstract class MuxHandlerAbstractTest { assertTrue(isHandlerAdded) assertFalse(isRegistered) isRegistered = true - println("MultiplexHandlerTest.channelRegistered") + println("MuxHandlerAbstractTest.channelRegistered") } override fun channelActive(ctx: ChannelHandlerContext) { assertTrue(isRegistered) assertFalse(isActivated) isActivated = true - println("MultiplexHandlerTest.channelActive") + println("MuxHandlerAbstractTest.channelActive") activeEventHandlers.forEach { it.handle(this) } } override fun channelRead(ctx: ChannelHandlerContext, msg: Any) { assertTrue(isActivated) - println("MultiplexHandlerTest.channelRead") + println("MuxHandlerAbstractTest.channelRead") msg as ByteBuf inboundMessages += msg.readAllBytesAndRelease().toHex() } override fun channelReadComplete(ctx: ChannelHandlerContext?) { readCompleteEventCount++ - println("MultiplexHandlerTest.channelReadComplete") + println("MuxHandlerAbstractTest.channelReadComplete") } override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) { userEvents += evt - println("MultiplexHandlerTest.userEventTriggered: $evt") + println("MuxHandlerAbstractTest.userEventTriggered: $evt") } override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { exceptions += cause - println("MultiplexHandlerTest.exceptionCaught") + println("MuxHandlerAbstractTest.exceptionCaught") } override fun channelInactive(ctx: ChannelHandlerContext) { assertTrue(isActivated) assertFalse(isInactivated) isInactivated = true - println("MultiplexHandlerTest.channelInactive") + println("MuxHandlerAbstractTest.channelInactive") } override fun channelUnregistered(ctx: ChannelHandlerContext?) { assertTrue(isInactivated) assertFalse(isUnregistered) isUnregistered = true - println("MultiplexHandlerTest.channelUnregistered") + println("MuxHandlerAbstractTest.channelUnregistered") } override fun handlerRemoved(ctx: ChannelHandlerContext?) { assertTrue(isUnregistered) assertFalse(isHandlerRemoved) isHandlerRemoved = true - println("MultiplexHandlerTest.handlerRemoved") + println("MuxHandlerAbstractTest.handlerRemoved") } } diff --git a/libp2p/src/test/kotlin/io/libp2p/mux/mplex/MplexHandlerTest.kt b/libp2p/src/test/kotlin/io/libp2p/mux/mplex/MplexHandlerTest.kt index 09110733..bd9fd88f 100644 --- a/libp2p/src/test/kotlin/io/libp2p/mux/mplex/MplexHandlerTest.kt +++ b/libp2p/src/test/kotlin/io/libp2p/mux/mplex/MplexHandlerTest.kt @@ -4,7 +4,6 @@ import io.libp2p.core.StreamHandler import io.libp2p.core.multistream.MultistreamProtocolV1 import io.libp2p.etc.types.fromHex import io.libp2p.etc.types.toHex -import io.libp2p.etc.util.netty.mux.MuxId import io.libp2p.mux.MuxHandler import io.libp2p.mux.MuxHandlerAbstractTest import io.libp2p.mux.MuxHandlerAbstractTest.AbstractTestMuxFrame.Flag.* @@ -28,6 +27,7 @@ class MplexHandlerTest : MuxHandlerAbstractTest() { } override fun writeFrame(frame: AbstractTestMuxFrame) { + val muxId = frame.streamId.toMuxId() val mplexFlag = when (frame.flag) { Open -> MplexFlag.Type.OPEN Data -> MplexFlag.Type.DATA @@ -39,7 +39,7 @@ class MplexHandlerTest : MuxHandlerAbstractTest() { else -> frame.data.fromHex().toByteBuf(allocateBuf()) } val mplexFrame = - MplexFrame(MuxId(parentChannelId, frame.streamId, true), MplexFlag.getByType(mplexFlag, true), data) + MplexFrame(muxId, MplexFlag.getByType(mplexFlag, true), data) ech.writeInbound(mplexFrame) } @@ -51,10 +51,9 @@ class MplexHandlerTest : MuxHandlerAbstractTest() { MplexFlag.Type.DATA -> Data MplexFlag.Type.CLOSE -> Close MplexFlag.Type.RESET -> Reset - else -> throw AssertionError("Unknown mplex flag: ${mplexFrame.flag}") } - val sData = maybeMplexFrame.data.readAllBytesAndRelease().toHex() - AbstractTestMuxFrame(mplexFrame.id.id, flag, sData) + val data = maybeMplexFrame.data.readAllBytesAndRelease().toHex() + AbstractTestMuxFrame(mplexFrame.id.id, flag, data) } } } diff --git a/libp2p/src/test/kotlin/io/libp2p/mux/yamux/YamuxHandlerTest.kt b/libp2p/src/test/kotlin/io/libp2p/mux/yamux/YamuxHandlerTest.kt index 4fc35691..640dc8d4 100644 --- a/libp2p/src/test/kotlin/io/libp2p/mux/yamux/YamuxHandlerTest.kt +++ b/libp2p/src/test/kotlin/io/libp2p/mux/yamux/YamuxHandlerTest.kt @@ -4,7 +4,6 @@ import io.libp2p.core.StreamHandler import io.libp2p.core.multistream.MultistreamProtocolV1 import io.libp2p.etc.types.fromHex import io.libp2p.etc.types.toHex -import io.libp2p.etc.util.netty.mux.MuxId import io.libp2p.mux.MuxHandler import io.libp2p.mux.MuxHandlerAbstractTest import io.libp2p.mux.MuxHandlerAbstractTest.AbstractTestMuxFrame.Flag.* @@ -20,7 +19,11 @@ class YamuxHandlerTest : MuxHandlerAbstractTest() { override fun createMuxHandler(streamHandler: StreamHandler<*>): MuxHandler = object : YamuxHandler( - MultistreamProtocolV1, maxFrameDataLength, null, streamHandler, true + MultistreamProtocolV1, + maxFrameDataLength, + null, + streamHandler, + true ) { // MuxHandler consumes the exception. Override this behaviour for testing @Deprecated("Deprecated in Java") @@ -29,19 +32,21 @@ class YamuxHandlerTest : MuxHandlerAbstractTest() { } } - private fun Long.toMuxId() = MuxId(parentChannelId, this, true) - override fun writeFrame(frame: AbstractTestMuxFrame) { val muxId = frame.streamId.toMuxId() val yamuxFrame = when (frame.flag) { Open -> YamuxFrame(muxId, YamuxType.DATA, YamuxFlags.SYN, 0) - Data -> YamuxFrame( - muxId, - YamuxType.DATA, - 0, - frame.data.fromHex().size.toLong(), - frame.data.fromHex().toByteBuf(allocateBuf()) - ) + Data -> { + val data = frame.data.fromHex() + YamuxFrame( + muxId, + YamuxType.DATA, + 0, + data.size.toLong(), + data.toByteBuf(allocateBuf()) + ) + } + Close -> YamuxFrame(muxId, YamuxType.DATA, YamuxFlags.FIN, 0) Reset -> YamuxFrame(muxId, YamuxType.DATA, YamuxFlags.RST, 0) } @@ -49,7 +54,7 @@ class YamuxHandlerTest : MuxHandlerAbstractTest() { } override fun readFrame(): AbstractTestMuxFrame? { - val yamuxFrame = ech.readOutbound() + val yamuxFrame = readYamuxFrame() if (yamuxFrame != null) { when (yamuxFrame.flags) { YamuxFlags.SYN -> readFrameQueue += AbstractTestMuxFrame(yamuxFrame.id.id, Open) @@ -70,6 +75,49 @@ class YamuxHandlerTest : MuxHandlerAbstractTest() { return readFrameQueue.removeFirstOrNull() } + private fun readYamuxFrame(): YamuxFrame? { + return ech.readOutbound() + } + + private fun readYamuxFrameOrThrow() = readYamuxFrame() ?: throw AssertionError("No outbound frames") + + @Test + fun `test ack new stream`() { + // signal opening of new stream + openStream(12) + + writeStream(12, "23") + + val ackFrame = readYamuxFrameOrThrow() + + // receives ack stream + assertThat(ackFrame.flags).isEqualTo(YamuxFlags.ACK) + assertThat(ackFrame.type).isEqualTo(YamuxType.WINDOW_UPDATE) + + closeStream(12) + } + + @Test + fun `test window update`() { + openStream(12) + + val largeMessage = "42".repeat(INITIAL_WINDOW_SIZE + 1) + writeStream(12, largeMessage) + + // ignore ack stream frame + readYamuxFrameOrThrow() + + val windowUpdateFrame = readYamuxFrameOrThrow() + + assertThat(windowUpdateFrame.flags).isZero() + assertThat(windowUpdateFrame.type).isEqualTo(YamuxType.WINDOW_UPDATE) + assertThat(windowUpdateFrame.length).isEqualTo((INITIAL_WINDOW_SIZE + 1).toLong()) + + assertLastMessage(0, 1, largeMessage) + + closeStream(12) + } + @Test fun `data should be buffered and sent after window increased from zero`() { val handler = openStreamByLocal() @@ -92,4 +140,49 @@ class YamuxHandlerTest : MuxHandlerAbstractTest() { val frame = readFrameOrThrow() assertThat(frame.data).isEqualTo("1984") } + + @Test + fun `test ping`() { + val id: Long = 0 + openStream(id) + ech.writeInbound( + YamuxFrame( + id.toMuxId(), + YamuxType.PING, + YamuxFlags.SYN, + // opaque value, echoed back + 3 + ) + ) + + // ignore ack stream frame + readYamuxFrameOrThrow() + + val pingFrame = readYamuxFrameOrThrow() + + assertThat(pingFrame.flags).isEqualTo(YamuxFlags.ACK) + assertThat(pingFrame.type).isEqualTo(YamuxType.PING) + assertThat(pingFrame.length).isEqualTo(3) + + closeStream(id) + } + + @Test + fun `test go away`() { + val id: Long = 0 + openStream(id) + ech.writeInbound( + YamuxFrame( + id.toMuxId(), + YamuxType.GO_AWAY, + 0, + // normal termination + 0x0 + ) + ) + + // verify session termination + assertThat(childHandlers[0].isHandlerRemoved).isTrue() + assertThat(childHandlers[0].isUnregistered).isTrue() + } }