Skip to content

Commit

Permalink
Adjust behavior of conflated channel to deliver last value
Browse files Browse the repository at this point in the history
Fixes #1235
Fixes #332
  • Loading branch information
elizarov committed Jun 4, 2019
1 parent 15ee8a3 commit a2024f6
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 48 deletions.
32 changes: 1 addition & 31 deletions kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -104,35 +104,6 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
return null
}

/**
* Queues conflated element, returns null on success or
* returns node reference if it was already closed or is waiting for receive.
* @suppress **This is unstable API and it is subject to change.**
*/
protected fun sendConflated(element: E): ReceiveOrClosed<*>? {
val node = SendBuffered(element)
queue.addLastIfPrev(node, { prev ->
if (prev is ReceiveOrClosed<*>) return@sendConflated prev
true
})
conflatePreviousSendBuffered(node)
return null
}

protected fun conflatePreviousSendBuffered(node: LockFreeLinkedListNode) {
/*
* Conflate all previous SendBuffered,
* helping other sends to coflate
*/
var prev = node.prevNode
while (prev is SendBuffered<*>) {
if (!prev.remove()) {
prev.helpRemove()
}
prev = prev.prevNode
}
}

/**
* @suppress **This is unstable API and it is subject to change.**
*/
Expand Down Expand Up @@ -331,7 +302,6 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
previous as Receive<E> // type assertion
previous.resumeReceiveClosed(closed)
}

onClosedIdempotent(closed)
}

Expand Down Expand Up @@ -499,7 +469,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
override fun toString(): String = "SendSelect($pollResult)[$channel, $select]"
}

private class SendBuffered<out E>(
internal class SendBuffered<out E>(
@JvmField val element: E
) : LockFreeLinkedListNode(), Send {
override val pollResult: Any? get() = element
Expand Down
48 changes: 33 additions & 15 deletions kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,35 @@ internal open class ConflatedChannel<E> : AbstractChannel<E>() {
protected final override val isBufferFull: Boolean get() = false

override fun onClosedIdempotent(closed: LockFreeLinkedListNode) {
conflatePreviousSendBuffered(closed)
@Suppress("UNCHECKED_CAST")
(closed.prevNode as? SendBuffered<E>)?.let { lastBuffered ->
conflatePreviousSendBuffered(lastBuffered)
}
}

/**
* Queues conflated element, returns null on success or
* returns node reference if it was already closed or is waiting for receive.
*/
private fun sendConflated(element: E): ReceiveOrClosed<*>? {
val node = SendBuffered(element)
queue.addLastIfPrev(node) { prev ->
if (prev is ReceiveOrClosed<*>) return@sendConflated prev
true
}
conflatePreviousSendBuffered(node)
return null
}

private fun conflatePreviousSendBuffered(node: SendBuffered<E>) {
// Conflate all previous SendBuffered, helping other sends to conflate
var prev = node.prevNode
while (prev is SendBuffered<*>) {
if (!prev.remove()) {
prev.helpRemove()
}
prev = prev.prevNode
}
}

// result is always `OFFER_SUCCESS | Closed`
Expand All @@ -35,20 +63,13 @@ internal open class ConflatedChannel<E> : AbstractChannel<E>() {
when {
result === OFFER_SUCCESS -> return OFFER_SUCCESS
result === OFFER_FAILED -> { // try to buffer
val sendResult = sendConflated(element)
when (sendResult) {
when (val sendResult = sendConflated(element)) {
null -> return OFFER_SUCCESS
is Closed<*> -> {
conflatePreviousSendBuffered(sendResult)
return sendResult
}
is Closed<*> -> return sendResult
}
// otherwise there was receiver in queue, retry super.offerInternal
}
result is Closed<*> -> {
conflatePreviousSendBuffered(result)
return result
}
result is Closed<*> -> return result
else -> error("Invalid offerInternal result $result")
}
}
Expand All @@ -64,10 +85,7 @@ internal open class ConflatedChannel<E> : AbstractChannel<E>() {
result === ALREADY_SELECTED -> return ALREADY_SELECTED
result === OFFER_SUCCESS -> return OFFER_SUCCESS
result === OFFER_FAILED -> {} // retry
result is Closed<*> -> {
conflatePreviousSendBuffered(result)
return result
}
result is Closed<*> -> return result
else -> error("Invalid result $result")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,13 @@ class ConflatedChannelTest : TestBase() {
fun testConflatedClose() = runTest {
val q = Channel<Int>(Channel.CONFLATED)
q.send(1)
q.close() // shall conflate sent item and become closed
q.close() // shall become closed but do not conflate last sent item yet
assertTrue(q.isClosedForSend)
assertFalse(q.isClosedForReceive)
assertEquals(1, q.receive())
// not it is closed for receive, too
assertTrue(q.isClosedForSend)
assertTrue(q.isClosedForReceive)
assertNull(q.receiveOrNull())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ class ChannelFlowTest : TestBase() {
val flow = channelFlow(bufferSize = Channel.CONFLATED) {
assertTrue(offer(1))
assertTrue(offer(2))
assertTrue(offer(3))
assertTrue(offer(4))
}
assertEquals(listOf(1), flow.toList())
assertEquals(listOf(1, 4), flow.toList()) // two elements in the middle got conflated
}

@Test
Expand Down

0 comments on commit a2024f6

Please sign in to comment.