Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update the documentation for the Channel interface #4241

Merged
merged 13 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ package kotlinx.coroutines.channels
*/
public enum class BufferOverflow {
/**
* Suspend on buffer overflow.
* Suspend until free space appears in the buffer.
*
* Use this to create backpressure, forcing the producers to slow down creation of new values in response to
* consumers not being able to process the incoming values in time.
Expand Down
1,098 changes: 868 additions & 230 deletions kotlinx-coroutines-core/common/src/channels/Channel.kt

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions kotlinx-coroutines-core/common/src/channels/Produce.kt
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {}) {
* The kind of the resulting channel depends on the specified [capacity] parameter.
* See the [Channel] interface documentation for details.
* By default, an unbuffered channel is created.
* If an invalid [capacity] value is specified, an [IllegalArgumentException] is thrown.
*
* ### Behavior on termination
*
Expand Down Expand Up @@ -114,9 +115,9 @@ public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {}) {
* channel.cancel()
* ```
*
* If this coroutine finishes with an exception, it will close the channel with that exception as the cause and
* the resulting channel will become _failed_, so after receiving all the existing elements, all further attempts
* to receive from it will throw the exception with which the coroutine finished.
* If this coroutine finishes with an exception, it will close the channel with that exception as the cause,
* so after receiving all the existing elements,
* all further attempts to receive from it will throw the exception with which the coroutine finished.
*
* ```
* val produceJob = Job()
Expand Down Expand Up @@ -150,8 +151,7 @@ public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {}) {
* } // throws a `CancellationException` exception after reaching -1
* ```
*
* Note that cancelling `produce` via structured concurrency closes the channel with a cause,
* making it a _failed_ channel.
* Note that cancelling `produce` via structured concurrency closes the channel with a cause.
*
* The behavior around coroutine cancellation and error handling is experimental and may change in a future release.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,20 @@ import kotlinx.coroutines.*
import kotlin.test.*

class BufferedChannelTest : TestBase() {

/** Tests that a buffered channel does not consume enough memory to fail with an OOM. */
@Test
fun testMemoryConsumption() = runTest {
val largeChannel = Channel<Int>(Int.MAX_VALUE / 2)
repeat(10_000) {
largeChannel.send(it)
}
repeat(10_000) {
val element = largeChannel.receive()
assertEquals(it, element)
}
}

@Test
fun testIteratorHasNextIsIdempotent() = runTest {
val q = Channel<Int>()
Expand Down
7 changes: 7 additions & 0 deletions kotlinx-coroutines-core/common/test/channels/ProduceTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,13 @@ class ProduceTest : TestBase() {
}
}

@Test
fun testProduceWithInvalidCapacity() = runTest {
assertFailsWith<IllegalArgumentException> {
produce<Int>(capacity = -3) { }
}
}

private suspend fun cancelOnCompletion(coroutineContext: CoroutineContext) = CoroutineScope(coroutineContext).apply {
val source = Channel<Int>()
expect(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,4 +273,24 @@ class RendezvousChannelTest : TestBase() {
channel.cancel(TestCancellationException())
channel.receiveCatching().getOrThrow()
}

/** Tests that [BufferOverflow.DROP_OLDEST] takes precedence over [Channel.RENDEZVOUS]. */
@Test
fun testDropOldest() = runTest {
val channel = Channel<Int>(Channel.RENDEZVOUS, onBufferOverflow = BufferOverflow.DROP_OLDEST)
channel.send(1)
channel.send(2)
channel.send(3)
assertEquals(3, channel.receive())
}

/** Tests that [BufferOverflow.DROP_LATEST] takes precedence over [Channel.RENDEZVOUS]. */
@Test
fun testDropLatest() = runTest {
val channel = Channel<Int>(Channel.RENDEZVOUS, onBufferOverflow = BufferOverflow.DROP_LATEST)
channel.send(1)
channel.send(2)
channel.send(3)
assertEquals(1, channel.receive())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -419,10 +419,10 @@ class SelectRendezvousChannelTest : TestBase() {
fun testSelectSendWhenClosed() = runTest {
expect(1)
val c = Channel<Int>(Channel.RENDEZVOUS)
val sender = launch(start = CoroutineStart.UNDISPATCHED) {
launch(start = CoroutineStart.UNDISPATCHED) {
expect(2)
c.send(1) // enqueue sender
expectUnreached()
finish(4)
}
c.close() // then close
assertFailsWith<ClosedSendChannelException> {
Expand All @@ -434,8 +434,7 @@ class SelectRendezvousChannelTest : TestBase() {
}
}
}
sender.cancel()
finish(4)
assertEquals(1, c.receive())
}

// only for debugging
Expand Down
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/jvm/src/channels/Actor.kt
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ public interface ActorScope<E> : CoroutineScope, ReceiveChannel<E> {
* it will be started implicitly on the first message
* [sent][SendChannel.send] to this actors's mailbox channel.
*
* Uncaught exceptions in this coroutine close the channel with this exception as a cause and
* the resulting channel becomes _failed_, so that any attempt to send to such a channel throws exception.
* Uncaught exceptions in this coroutine close the channel with this exception as a cause,
* so that any attempt to send to such a channel throws exception.
*
* The kind of the resulting channel depends on the specified [capacity] parameter.
* See [Channel] interface documentation for details.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package kotlinx.coroutines.channels

import kotlinx.coroutines.*
import kotlinx.coroutines.testing.*
import kotlin.test.*

class CancelledChannelLeakTest : TestBase() {
/**
* Tests that cancellation removes the elements from the channel's buffer.
*/
@Test
fun testBufferedChannelLeak() = runTest {
for (capacity in listOf(Channel.CONFLATED, Channel.RENDEZVOUS, 1, 2, 5, 10)) {
val channel = Channel<X>(capacity)
val value = X()
launch(start = CoroutineStart.UNDISPATCHED) {
channel.send(value)
}
FieldWalker.assertReachableCount(1, channel) { it === value }
channel.cancel()
// the element must be removed so that there is no memory leak
FieldWalker.assertReachableCount(0, channel) { it === value }
}
}

class X
}