Skip to content

Commit

Permalink
Introduce SharedFlow and sharing operators (Kotlin#2069)
Browse files Browse the repository at this point in the history
* Introduce SharedFlow and sharing operators

Summary of changes:
* SharedFlow, MutableSharedFlow and its constructor.
* StateFlow implements SharedFlow.
* SharedFlow.onSubscription operator, clarified docs in other onXxx operators.
* BufferOverflow strategy in kotlinx.coroutines.channels package.
* shareIn and stateIn operators and SharingStarted strategies for them.
* SharedFlow.flowOn error lint (up from StateFlow).
* Precise cancellable() operator fusion.
* Precise distinctUntilChanged() operator fusion.
* StateFlow.compareAndSet function.
* asStateFlow and asSharedFlow read-only view functions.
* Consistently clarified docs on cold vs hot flows.
* Future deprecation notice for BroadcastChannel, ConflatedBroadcastChannel, broadcast, and broadcastIn.
* Channel(...) constructor function has onBufferOverflow parameter.
* buffer(...) operator has onBufferOverflow parameter.
* shareIn/stateIn buffer and overflow strategy are configured via upstream buffer operators.
* shareIn/stateIn fuse with upstream flowOn for more efficient execution.
* conflate() is implemented as buffer(onBufferOverflow=KEEP_LATEST), non-suspending strategies are reasonably supported with 0 and default capacities.
* Added reactive operator migration hints.
* WhileSubscribed with kotlin.time.Duration params

Fixes Kotlin#2034
Fixes Kotlin#2047

Co-authored-by: Ibraheem Zaman <1zaman@users.noreply.github.com>
Co-authored-by: Thomas Vos <thomasjsvos@gmail.com>
Co-authored-by: Travis Wyatt <travis.i.wyatt@gmail.com>
  • Loading branch information
4 people authored and recheej committed Dec 28, 2020
1 parent 24edb02 commit 836bc56
Show file tree
Hide file tree
Showing 58 changed files with 4,879 additions and 422 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ suspend fun main() = coroutineScope {
* [DebugProbes] API to probe, keep track of, print and dump active coroutines;
* [CoroutinesTimeout] test rule to automatically dump coroutines on test timeout.
* [reactive](reactive/README.md) &mdash; modules that provide builders and iteration support for various reactive streams libraries:
* Reactive Streams ([Publisher.collect], [Publisher.awaitSingle], [publish], etc),
* Reactive Streams ([Publisher.collect], [Publisher.awaitSingle], [kotlinx.coroutines.reactive.publish], etc),
* Flow (JDK 9) (the same interface as for Reactive Streams),
* RxJava 2.x ([rxFlowable], [rxSingle], etc), and
* RxJava 3.x ([rxFlowable], [rxSingle], etc), and
Expand Down Expand Up @@ -302,7 +302,7 @@ The `develop` branch is pushed to `master` during release.
<!--- INDEX kotlinx.coroutines.reactive -->
[Publisher.collect]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/collect.html
[Publisher.awaitSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/await-single.html
[publish]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/publish.html
[kotlinx.coroutines.reactive.publish]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/publish.html
<!--- MODULE kotlinx-coroutines-rx2 -->
<!--- INDEX kotlinx.coroutines.rx2 -->
[rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-flowable.html
Expand Down
92 changes: 79 additions & 13 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/CompletableDeferred.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import kotlinx.coroutines.selects.*
* All functions on this interface are **thread-safe** and can
* be safely invoked from concurrent coroutines without external synchronization.
*
* **`CompletableDeferred` interface is not stable for inheritance in 3rd party libraries**,
* **The `CompletableDeferred` interface is not stable for inheritance in 3rd party libraries**,
* as new methods might be added to this interface in the future, but is stable for use.
*/
public interface CompletableDeferred<T> : Deferred<T> {
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/CompletableJob.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ package kotlinx.coroutines
* All functions on this interface are **thread-safe** and can
* be safely invoked from concurrent coroutines without external synchronization.
*
* **`CompletableJob` interface is not stable for inheritance in 3rd party libraries**,
* **The `CompletableJob` interface is not stable for inheritance in 3rd party libraries**,
* as new methods might be added to this interface in the future, but is stable for use.
*/
public interface CompletableJob : Job {
Expand Down
5 changes: 4 additions & 1 deletion kotlinx-coroutines-core/common/src/Delay.kt
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,10 @@ public suspend fun awaitCancellation(): Nothing = suspendCancellableCoroutine {}
public suspend fun delay(timeMillis: Long) {
if (timeMillis <= 0) return // don't delay
return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
// if timeMillis == Long.MAX_VALUE then just wait forever like awaitCancellation, don't schedule.
if (timeMillis < Long.MAX_VALUE) {
cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
}
}
}

Expand Down
10 changes: 5 additions & 5 deletions kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -1019,23 +1019,23 @@ internal val EMPTY = Symbol("EMPTY") // marker for Conflated & Buffered channels

@JvmField
@SharedImmutable
internal val OFFER_SUCCESS: Any = Symbol("OFFER_SUCCESS")
internal val OFFER_SUCCESS = Symbol("OFFER_SUCCESS")

@JvmField
@SharedImmutable
internal val OFFER_FAILED: Any = Symbol("OFFER_FAILED")
internal val OFFER_FAILED = Symbol("OFFER_FAILED")

@JvmField
@SharedImmutable
internal val POLL_FAILED: Any = Symbol("POLL_FAILED")
internal val POLL_FAILED = Symbol("POLL_FAILED")

@JvmField
@SharedImmutable
internal val ENQUEUE_FAILED: Any = Symbol("ENQUEUE_FAILED")
internal val ENQUEUE_FAILED = Symbol("ENQUEUE_FAILED")

@JvmField
@SharedImmutable
internal val HANDLER_INVOKED: Any = Symbol("ON_CLOSE_HANDLER_INVOKED")
internal val HANDLER_INVOKED = Symbol("ON_CLOSE_HANDLER_INVOKED")

internal typealias Handler = (Throwable?) -> Unit

Expand Down
133 changes: 78 additions & 55 deletions kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,18 @@ internal open class ArrayChannel<E>(
/**
* Buffer capacity.
*/
val capacity: Int,
private val capacity: Int,
private val onBufferOverflow: BufferOverflow,
onUndeliveredElement: OnUndeliveredElement<E>?
) : AbstractChannel<E>(onUndeliveredElement) {
init {
// This check is actually used by the Channel(...) constructor function which checks only for known
// capacities and calls ArrayChannel constructor for everything else.
require(capacity >= 1) { "ArrayChannel capacity must be at least 1, but $capacity was specified" }
}

private val lock = ReentrantLock()

/*
* Guarded by lock.
* Allocate minimum of capacity and 16 to avoid excess memory pressure for large channels when it's not necessary.
Expand All @@ -43,7 +47,7 @@ internal open class ArrayChannel<E>(
protected final override val isBufferAlwaysEmpty: Boolean get() = false
protected final override val isBufferEmpty: Boolean get() = size.value == 0
protected final override val isBufferAlwaysFull: Boolean get() = false
protected final override val isBufferFull: Boolean get() = size.value == capacity
protected final override val isBufferFull: Boolean get() = size.value == capacity && onBufferOverflow == BufferOverflow.SUSPEND

override val isFull: Boolean get() = lock.withLock { isFullImpl }
override val isEmpty: Boolean get() = lock.withLock { isEmptyImpl }
Expand All @@ -55,31 +59,26 @@ internal open class ArrayChannel<E>(
lock.withLock {
val size = this.size.value
closedForSend?.let { return it }
if (size < capacity) {
// tentatively put element to buffer
this.size.value = size + 1 // update size before checking queue (!!!)
// check for receivers that were waiting on empty queue
if (size == 0) {
loop@ while (true) {
receive = takeFirstReceiveOrPeekClosed() ?: break@loop // break when no receivers queued
if (receive is Closed) {
this.size.value = size // restore size
return receive!!
}
val token = receive!!.tryResumeReceive(element, null)
if (token != null) {
assert { token === RESUME_TOKEN }
this.size.value = size // restore size
return@withLock
}
// update size before checking queue (!!!)
updateBufferSize(size)?.let { return it }
// check for receivers that were waiting on empty queue
if (size == 0) {
loop@ while (true) {
receive = takeFirstReceiveOrPeekClosed() ?: break@loop // break when no receivers queued
if (receive is Closed) {
this.size.value = size // restore size
return receive!!
}
val token = receive!!.tryResumeReceive(element, null)
if (token != null) {
assert { token === RESUME_TOKEN }
this.size.value = size // restore size
return@withLock
}
}
ensureCapacity(size)
buffer[(head + size) % buffer.size] = element // actually queue element
return OFFER_SUCCESS
}
// size == capacity: full
return OFFER_FAILED
enqueueElement(size, element)
return OFFER_SUCCESS
}
// breaks here if offer meets receiver
receive!!.completeResumeReceive(element)
Expand All @@ -92,41 +91,36 @@ internal open class ArrayChannel<E>(
lock.withLock {
val size = this.size.value
closedForSend?.let { return it }
if (size < capacity) {
// tentatively put element to buffer
this.size.value = size + 1 // update size before checking queue (!!!)
// check for receivers that were waiting on empty queue
if (size == 0) {
loop@ while (true) {
val offerOp = describeTryOffer(element)
val failure = select.performAtomicTrySelect(offerOp)
when {
failure == null -> { // offered successfully
this.size.value = size // restore size
receive = offerOp.result
return@withLock
}
failure === OFFER_FAILED -> break@loop // cannot offer -> Ok to queue to buffer
failure === RETRY_ATOMIC -> {} // retry
failure === ALREADY_SELECTED || failure is Closed<*> -> {
this.size.value = size // restore size
return failure
}
else -> error("performAtomicTrySelect(describeTryOffer) returned $failure")
// update size before checking queue (!!!)
updateBufferSize(size)?.let { return it }
// check for receivers that were waiting on empty queue
if (size == 0) {
loop@ while (true) {
val offerOp = describeTryOffer(element)
val failure = select.performAtomicTrySelect(offerOp)
when {
failure == null -> { // offered successfully
this.size.value = size // restore size
receive = offerOp.result
return@withLock
}
failure === OFFER_FAILED -> break@loop // cannot offer -> Ok to queue to buffer
failure === RETRY_ATOMIC -> {} // retry
failure === ALREADY_SELECTED || failure is Closed<*> -> {
this.size.value = size // restore size
return failure
}
else -> error("performAtomicTrySelect(describeTryOffer) returned $failure")
}
}
// let's try to select sending this element to buffer
if (!select.trySelect()) { // :todo: move trySelect completion outside of lock
this.size.value = size // restore size
return ALREADY_SELECTED
}
ensureCapacity(size)
buffer[(head + size) % buffer.size] = element // actually queue element
return OFFER_SUCCESS
}
// size == capacity: full
return OFFER_FAILED
// let's try to select sending this element to buffer
if (!select.trySelect()) { // :todo: move trySelect completion outside of lock
this.size.value = size // restore size
return ALREADY_SELECTED
}
enqueueElement(size, element)
return OFFER_SUCCESS
}
// breaks here if offer meets receiver
receive!!.completeResumeReceive(element)
Expand All @@ -137,6 +131,35 @@ internal open class ArrayChannel<E>(
super.enqueueSend(send)
}

// Guarded by lock
// Result is `OFFER_SUCCESS | OFFER_FAILED | null`
private fun updateBufferSize(currentSize: Int): Symbol? {
if (currentSize < capacity) {
size.value = currentSize + 1 // tentatively put it into the buffer
return null // proceed
}
// buffer is full
return when (onBufferOverflow) {
BufferOverflow.SUSPEND -> OFFER_FAILED
BufferOverflow.DROP_LATEST -> OFFER_SUCCESS
BufferOverflow.DROP_OLDEST -> null // proceed, will drop oldest in enqueueElement
}
}

// Guarded by lock
private fun enqueueElement(currentSize: Int, element: E) {
if (currentSize < capacity) {
ensureCapacity(currentSize)
buffer[(head + currentSize) % buffer.size] = element // actually queue element
} else {
// buffer is full
assert { onBufferOverflow == BufferOverflow.DROP_OLDEST } // the only way we can get here
buffer[head % buffer.size] = null // drop oldest element
buffer[(head + currentSize) % buffer.size] = element // actually queue element
head = (head + 1) % buffer.size
}
}

// Guarded by lock
private fun ensureCapacity(currentSize: Int) {
if (currentSize >= buffer.size) {
Expand Down
7 changes: 4 additions & 3 deletions kotlinx-coroutines-core/common/src/channels/Broadcast.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.intrinsics.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*
import kotlin.native.concurrent.*

/**
* Broadcasts all elements of the channel.
Expand All @@ -34,8 +33,10 @@ import kotlin.native.concurrent.*
*
* This function has an inappropriate result type of [BroadcastChannel] which provides
* [send][BroadcastChannel.send] and [close][BroadcastChannel.close] operations that interfere with
* the broadcasting coroutine in hard-to-specify ways. It will be replaced with
* sharing operators on [Flow][kotlinx.coroutines.flow.Flow] in the future.
* the broadcasting coroutine in hard-to-specify ways.
*
* **Note: This API is obsolete.** It will be deprecated and replaced with the
* [Flow.shareIn][kotlinx.coroutines.flow.shareIn] operator when it becomes stable.
*
* @param start coroutine start option. The default value is [CoroutineStart.LAZY].
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
package kotlinx.coroutines.channels

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
import kotlinx.coroutines.channels.Channel.Factory.CHANNEL_DEFAULT_CAPACITY
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED

/**
Expand All @@ -20,9 +20,10 @@ import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
* See `BroadcastChannel()` factory function for the description of available
* broadcast channel implementations.
*
* **Note: This is an experimental api.** It may be changed in the future updates.
* **Note: This API is obsolete.** It will be deprecated and replaced by [SharedFlow][kotlinx.coroutines.flow.SharedFlow]
* when it becomes stable.
*/
@ExperimentalCoroutinesApi
@ExperimentalCoroutinesApi // not @ObsoleteCoroutinesApi to reduce burden for people who are still using it
public interface BroadcastChannel<E> : SendChannel<E> {
/**
* Subscribes to this [BroadcastChannel] and returns a channel to receive elements from it.
Expand Down
36 changes: 36 additions & 0 deletions kotlinx-coroutines-core/common/src/channels/BufferOverflow.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.channels

import kotlinx.coroutines.*

/**
* A strategy for buffer overflow handling in [channels][Channel] and [flows][kotlinx.coroutines.flow.Flow] that
* controls what is going to be sacrificed on buffer overflow:
*
* * [SUSPEND] &mdash; the upstream that is [sending][SendChannel.send] or
* is [emitting][kotlinx.coroutines.flow.FlowCollector.emit] a value is **suspended** while the buffer is full.
* * [DROP_OLDEST] &mdash; drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend.
* * [DROP_LATEST] &mdash; drop **the latest** value that is being added to the buffer right now on buffer overflow
* (so that buffer contents stay the same), do not suspend.
*/
@ExperimentalCoroutinesApi
public enum class BufferOverflow {
/**
* Suspend on buffer overflow.
*/
SUSPEND,

/**
* Drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend.
*/
DROP_OLDEST,

/**
* Drop **the latest** value that is being added to the buffer right now on buffer overflow
* (so that buffer contents stay the same), do not suspend.
*/
DROP_LATEST
}
Loading

0 comments on commit 836bc56

Please sign in to comment.