diff --git a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt index d6ae8e4416..3b336427a0 100644 --- a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt @@ -104,35 +104,6 @@ internal abstract class AbstractSendChannel : SendChannel { return null } - /** - * Queues conflated element, returns null on success or - * returns node reference if it was already closed or is waiting for receive. - * @suppress **This is unstable API and it is subject to change.** - */ - protected fun sendConflated(element: E): ReceiveOrClosed<*>? { - val node = SendBuffered(element) - queue.addLastIfPrev(node, { prev -> - if (prev is ReceiveOrClosed<*>) return@sendConflated prev - true - }) - conflatePreviousSendBuffered(node) - return null - } - - protected fun conflatePreviousSendBuffered(node: LockFreeLinkedListNode) { - /* - * Conflate all previous SendBuffered, - * helping other sends to coflate - */ - var prev = node.prevNode - while (prev is SendBuffered<*>) { - if (!prev.remove()) { - prev.helpRemove() - } - prev = prev.prevNode - } - } - /** * @suppress **This is unstable API and it is subject to change.** */ @@ -331,7 +302,6 @@ internal abstract class AbstractSendChannel : SendChannel { previous as Receive // type assertion previous.resumeReceiveClosed(closed) } - onClosedIdempotent(closed) } @@ -499,7 +469,7 @@ internal abstract class AbstractSendChannel : SendChannel { override fun toString(): String = "SendSelect($pollResult)[$channel, $select]" } - private class SendBuffered( + internal class SendBuffered( @JvmField val element: E ) : LockFreeLinkedListNode(), Send { override val pollResult: Any? get() = element diff --git a/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt b/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt index 339bfd2c08..21a18832a4 100644 --- a/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt @@ -25,7 +25,35 @@ internal open class ConflatedChannel : AbstractChannel() { protected final override val isBufferFull: Boolean get() = false override fun onClosedIdempotent(closed: LockFreeLinkedListNode) { - conflatePreviousSendBuffered(closed) + @Suppress("UNCHECKED_CAST") + (closed.prevNode as? SendBuffered)?.let { lastBuffered -> + conflatePreviousSendBuffered(lastBuffered) + } + } + + /** + * Queues conflated element, returns null on success or + * returns node reference if it was already closed or is waiting for receive. + */ + private fun sendConflated(element: E): ReceiveOrClosed<*>? { + val node = SendBuffered(element) + queue.addLastIfPrev(node) { prev -> + if (prev is ReceiveOrClosed<*>) return@sendConflated prev + true + } + conflatePreviousSendBuffered(node) + return null + } + + private fun conflatePreviousSendBuffered(node: SendBuffered) { + // Conflate all previous SendBuffered, helping other sends to conflate + var prev = node.prevNode + while (prev is SendBuffered<*>) { + if (!prev.remove()) { + prev.helpRemove() + } + prev = prev.prevNode + } } // result is always `OFFER_SUCCESS | Closed` @@ -35,20 +63,13 @@ internal open class ConflatedChannel : AbstractChannel() { when { result === OFFER_SUCCESS -> return OFFER_SUCCESS result === OFFER_FAILED -> { // try to buffer - val sendResult = sendConflated(element) - when (sendResult) { + when (val sendResult = sendConflated(element)) { null -> return OFFER_SUCCESS - is Closed<*> -> { - conflatePreviousSendBuffered(sendResult) - return sendResult - } + is Closed<*> -> return sendResult } // otherwise there was receiver in queue, retry super.offerInternal } - result is Closed<*> -> { - conflatePreviousSendBuffered(result) - return result - } + result is Closed<*> -> return result else -> error("Invalid offerInternal result $result") } } @@ -64,10 +85,7 @@ internal open class ConflatedChannel : AbstractChannel() { result === ALREADY_SELECTED -> return ALREADY_SELECTED result === OFFER_SUCCESS -> return OFFER_SUCCESS result === OFFER_FAILED -> {} // retry - result is Closed<*> -> { - conflatePreviousSendBuffered(result) - return result - } + result is Closed<*> -> return result else -> error("Invalid result $result") } } diff --git a/kotlinx-coroutines-core/common/test/channels/ConflatedChannelTest.kt b/kotlinx-coroutines-core/common/test/channels/ConflatedChannelTest.kt index 6b5e020d27..4deb3858f0 100644 --- a/kotlinx-coroutines-core/common/test/channels/ConflatedChannelTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ConflatedChannelTest.kt @@ -31,7 +31,13 @@ class ConflatedChannelTest : TestBase() { fun testConflatedClose() = runTest { val q = Channel(Channel.CONFLATED) q.send(1) - q.close() // shall conflate sent item and become closed + q.close() // shall become closed but do not conflate last sent item yet + assertTrue(q.isClosedForSend) + assertFalse(q.isClosedForReceive) + assertEquals(1, q.receive()) + // not it is closed for receive, too + assertTrue(q.isClosedForSend) + assertTrue(q.isClosedForReceive) assertNull(q.receiveOrNull()) } diff --git a/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt index 5d0292ef1e..e8754e1db4 100644 --- a/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt @@ -34,8 +34,10 @@ class ChannelFlowTest : TestBase() { val flow = channelFlow(bufferSize = Channel.CONFLATED) { assertTrue(offer(1)) assertTrue(offer(2)) + assertTrue(offer(3)) + assertTrue(offer(4)) } - assertEquals(listOf(1), flow.toList()) + assertEquals(listOf(1, 4), flow.toList()) // two elements in the middle got conflated } @Test