Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust behavior of conflated channel to deliver last value #1239

Merged
merged 1 commit into from
Jun 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 1 addition & 31 deletions kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -104,35 +104,6 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
return null
}

/**
* Queues conflated element, returns null on success or
* returns node reference if it was already closed or is waiting for receive.
* @suppress **This is unstable API and it is subject to change.**
*/
protected fun sendConflated(element: E): ReceiveOrClosed<*>? {
val node = SendBuffered(element)
queue.addLastIfPrev(node, { prev ->
if (prev is ReceiveOrClosed<*>) return@sendConflated prev
true
})
conflatePreviousSendBuffered(node)
return null
}

protected fun conflatePreviousSendBuffered(node: LockFreeLinkedListNode) {
/*
* Conflate all previous SendBuffered,
* helping other sends to coflate
*/
var prev = node.prevNode
while (prev is SendBuffered<*>) {
if (!prev.remove()) {
prev.helpRemove()
}
prev = prev.prevNode
}
}

/**
* @suppress **This is unstable API and it is subject to change.**
*/
Expand Down Expand Up @@ -331,7 +302,6 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
previous as Receive<E> // type assertion
previous.resumeReceiveClosed(closed)
}

onClosedIdempotent(closed)
}

Expand Down Expand Up @@ -499,7 +469,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
override fun toString(): String = "SendSelect($pollResult)[$channel, $select]"
}

private class SendBuffered<out E>(
internal class SendBuffered<out E>(
@JvmField val element: E
) : LockFreeLinkedListNode(), Send {
override val pollResult: Any? get() = element
Expand Down
48 changes: 33 additions & 15 deletions kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,35 @@ internal open class ConflatedChannel<E> : AbstractChannel<E>() {
protected final override val isBufferFull: Boolean get() = false

override fun onClosedIdempotent(closed: LockFreeLinkedListNode) {
conflatePreviousSendBuffered(closed)
@Suppress("UNCHECKED_CAST")
(closed.prevNode as? SendBuffered<E>)?.let { lastBuffered ->
conflatePreviousSendBuffered(lastBuffered)
}
}

/**
* Queues conflated element, returns null on success or
* returns node reference if it was already closed or is waiting for receive.
*/
private fun sendConflated(element: E): ReceiveOrClosed<*>? {
val node = SendBuffered(element)
queue.addLastIfPrev(node) { prev ->
if (prev is ReceiveOrClosed<*>) return@sendConflated prev
true
}
conflatePreviousSendBuffered(node)
return null
}

private fun conflatePreviousSendBuffered(node: SendBuffered<E>) {
// Conflate all previous SendBuffered, helping other sends to conflate
var prev = node.prevNode
while (prev is SendBuffered<*>) {
if (!prev.remove()) {
prev.helpRemove()
}
prev = prev.prevNode
}
}

// result is always `OFFER_SUCCESS | Closed`
Expand All @@ -35,20 +63,13 @@ internal open class ConflatedChannel<E> : AbstractChannel<E>() {
when {
result === OFFER_SUCCESS -> return OFFER_SUCCESS
result === OFFER_FAILED -> { // try to buffer
val sendResult = sendConflated(element)
when (sendResult) {
when (val sendResult = sendConflated(element)) {
null -> return OFFER_SUCCESS
is Closed<*> -> {
conflatePreviousSendBuffered(sendResult)
return sendResult
}
is Closed<*> -> return sendResult
}
// otherwise there was receiver in queue, retry super.offerInternal
}
result is Closed<*> -> {
conflatePreviousSendBuffered(result)
return result
}
result is Closed<*> -> return result
else -> error("Invalid offerInternal result $result")
}
}
Expand All @@ -64,10 +85,7 @@ internal open class ConflatedChannel<E> : AbstractChannel<E>() {
result === ALREADY_SELECTED -> return ALREADY_SELECTED
result === OFFER_SUCCESS -> return OFFER_SUCCESS
result === OFFER_FAILED -> {} // retry
result is Closed<*> -> {
conflatePreviousSendBuffered(result)
return result
}
result is Closed<*> -> return result
else -> error("Invalid result $result")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,13 @@ class ConflatedChannelTest : TestBase() {
fun testConflatedClose() = runTest {
val q = Channel<Int>(Channel.CONFLATED)
q.send(1)
q.close() // shall conflate sent item and become closed
q.close() // shall become closed but do not conflate last sent item yet
assertTrue(q.isClosedForSend)
assertFalse(q.isClosedForReceive)
assertEquals(1, q.receive())
// not it is closed for receive, too
assertTrue(q.isClosedForSend)
assertTrue(q.isClosedForReceive)
assertNull(q.receiveOrNull())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ class ChannelFlowTest : TestBase() {
val flow = channelFlow(bufferSize = Channel.CONFLATED) {
assertTrue(offer(1))
assertTrue(offer(2))
assertTrue(offer(3))
assertTrue(offer(4))
}
assertEquals(listOf(1), flow.toList())
assertEquals(listOf(1, 4), flow.toList()) // two elements in the middle got conflated
}

@Test
Expand Down