diff --git a/kotlinx-coroutines-core/common/src/CoroutineContext.common.kt b/kotlinx-coroutines-core/common/src/CoroutineContext.common.kt index ae9cb73201..785e8a7691 100644 --- a/kotlinx-coroutines-core/common/src/CoroutineContext.common.kt +++ b/kotlinx-coroutines-core/common/src/CoroutineContext.common.kt @@ -20,5 +20,4 @@ internal expect val DefaultDelay: Delay // countOrElement -- pre-cached value for ThreadContext.kt internal expect inline fun withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T internal expect fun Continuation<*>.toDebugString(): String -internal expect val CoroutineContext.coroutineName: String? -internal expect fun CoroutineContext.minusId(): CoroutineContext \ No newline at end of file +internal expect val CoroutineContext.coroutineName: String? \ No newline at end of file diff --git a/kotlinx-coroutines-core/common/src/JobSupport.kt b/kotlinx-coroutines-core/common/src/JobSupport.kt index a20774b774..499a27d902 100644 --- a/kotlinx-coroutines-core/common/src/JobSupport.kt +++ b/kotlinx-coroutines-core/common/src/JobSupport.kt @@ -783,6 +783,11 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren if (!tryFinalizeSimpleState(state, proposedUpdate, mode)) return COMPLETING_RETRY return COMPLETING_COMPLETED } + // The separate slow-path function to simplify profiling + return tryMakeCompletingSlowPath(state, proposedUpdate, mode) + } + + private fun tryMakeCompletingSlowPath(state: Incomplete, proposedUpdate: Any?, mode: Int): Int { // get state's list or else promote to list to correctly operate on child lists val list = getOrPromoteCancellingList(state) ?: return COMPLETING_RETRY // promote to Finishing state if we are not in it yet @@ -1202,7 +1207,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren * Class to represent object as the final state of the Job */ private class IncompleteStateBox(@JvmField val state: Incomplete) -private fun Any?.boxIncomplete(): Any? = if (this is Incomplete) IncompleteStateBox(this) else this +internal fun Any?.boxIncomplete(): Any? = if (this is Incomplete) IncompleteStateBox(this) else this internal fun Any?.unboxState(): Any? = (this as? IncompleteStateBox)?.state ?: this // --------------- helper classes & constants for job implementation diff --git a/kotlinx-coroutines-core/common/src/flow/Flow.kt b/kotlinx-coroutines-core/common/src/flow/Flow.kt index d1467d3550..eb67d12f7c 100644 --- a/kotlinx-coroutines-core/common/src/flow/Flow.kt +++ b/kotlinx-coroutines-core/common/src/flow/Flow.kt @@ -27,7 +27,7 @@ import kotlinx.coroutines.* * trigger their evaluation every time [collect] is executed) or hot ones, but, conventionally, they represent cold streams. * Transitions between hot and cold streams are supported via channels and the corresponding API: [flowViaChannel], [broadcastIn], [produceIn]. * - * The flow has a context preserving property: it encapsulates its own execution context and never propagates or leaks it downstream, thus making + * The flow has a context preservation property: it encapsulates its own execution context and never propagates or leaks it downstream, thus making * reasoning about the execution context of particular transformations or terminal operations trivial. * * There are two ways to change the context of a flow: [flowOn][Flow.flowOn] and [flowWith][Flow.flowWith]. @@ -52,24 +52,37 @@ import kotlinx.coroutines.* * } * ``` * - * From the implementation point of view it means that all intermediate operators on [Flow] should abide by the following constraint: - * If collection or emission of a flow is to be separated into multiple coroutines, it should use [coroutineScope] or [supervisorScope] and - * is not allowed to modify the coroutines' context: + * From the implementation point of view it means that all intermediate operators on [Flow] should abide by the following constraints: + * 1) If an operator is trivial and does not start any coroutines, regular [flow] builder should be used. Its implementation + * efficiently enforces all the invariants and prevents most of the development mistakes. + * + * 2) If the collection and emission of the flow are to be separated into multiple coroutines, [channelFlow] should be used. + * [channelFlow] encapsulates all the context preservation work and allows you to focus on your domain-specific problem, + * rather than invariant implementation details. It is possible to use any combination of coroutine builders from within [channelFlow]. + * + * 3) If you are looking for the performance and are sure that no concurrent emits and context jumps will happen, [flow] builder + * alongside with [coroutineScope] or [supervisorScope] can be used instead: + * + * - Scoped primitive should be used to provide a [CoroutineScope] + * - Changing the context of emission is prohibited, no matter whether it is `withContext(ctx)` or builder argument (e.g. `launch(ctx)`) + * - Changing the context of collection is allowed, but it has the same effect as [flowOn] operator and changes the upstream context. + * + * These constraints are enforced by the default [flow] builder. + * Example of the proper `buffer` implementation: * ``` * fun Flow.buffer(bufferSize: Int): Flow = flow { * coroutineScope { // coroutine scope is necessary, withContext is prohibited - * val channel = Channel(bufferSize) - * // GlobalScope.launch { is prohibited - * // launch(Dispatchers.IO) { is prohibited - * launch { // is OK - * collect { value -> + * // GlobalScope.produce { is prohibited + * val channel = produce(bufferSize) { + * collect { value -> // Collect from started coroutine -- OK * channel.send(value) * } - * channel.close() * } * * for (i in channel) { - * emit(i) + * emit(i) // Emission from the enclosing scope -- OK + * // launch { emit(i) } -- prohibited + * // withContext(Dispatchers.IO) { emit(i) } * } * } * } @@ -87,23 +100,10 @@ public interface Flow { * A valid implementation of this method has the following constraints: * 1) It should not change the coroutine context (e.g. with `withContext(Dispatchers.IO)`) when emitting values. * The emission should happen in the context of the [collect] call. - * - * Only coroutine builders that inherit the context are allowed, for example: - * ``` - * class MyFlow : Flow { - * override suspend fun collect(collector: FlowCollector) { - * coroutineScope { - * // Context is inherited - * launch { // Dispatcher is not overridden, fine as well - * collector.emit(42) // Emit from the launched coroutine - * } - * } - * } - * } - * ``` - * is a proper [Flow] implementation, but using `launch(Dispatchers.IO)` is not. + * Please refer to the top-level [Flow] documentation for more details. * * 2) It should serialize calls to [emit][FlowCollector.emit] as [FlowCollector] implementations are not thread safe by default. + * To automatically serialize emissions [channelFlow] builder can be used instead of [flow] */ public suspend fun collect(collector: FlowCollector) } diff --git a/kotlinx-coroutines-core/common/src/flow/FlowCollector.kt b/kotlinx-coroutines-core/common/src/flow/FlowCollector.kt index 2db7dfae63..38d5954ce9 100644 --- a/kotlinx-coroutines-core/common/src/flow/FlowCollector.kt +++ b/kotlinx-coroutines-core/common/src/flow/FlowCollector.kt @@ -18,6 +18,7 @@ public interface FlowCollector { /** * Collects the value emitted by the upstream. + * This method is not thread-safe and should not be invoked concurrently. */ public suspend fun emit(value: T) } diff --git a/kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.kt b/kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.kt index 17306e2ebf..30f5484cad 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.kt @@ -12,18 +12,90 @@ import kotlin.coroutines.* @PublishedApi internal class SafeCollector( private val collector: FlowCollector, - collectContext: CoroutineContext -) : FlowCollector, SynchronizedObject() { + private val collectContext: CoroutineContext +) : FlowCollector { - private val collectContext = collectContext.minusKey(Job).minusId() + // Note, it is non-capturing lambda, so no extra allocation during init of SafeCollector + private val collectContextSize = collectContext.fold(0) { count, _ -> count + 1 } + private var lastEmissionContext: CoroutineContext? = null override suspend fun emit(value: T) { - val emitContext = coroutineContext.minusKey(Job).minusId() - if (emitContext != collectContext) { + /* + * Benign data-race here: + * We read potentially racy published coroutineContext, but we only use it for + * referential comparison (=> thus safe) and are not using it for structural comparisons. + */ + val currentContext = coroutineContext + // This check is triggered once per flow on happy path. + if (lastEmissionContext !== currentContext) { + checkContext(currentContext) + lastEmissionContext = currentContext + } + collector.emit(value) // TCE + } + + private fun checkContext(currentContext: CoroutineContext) { + val result = currentContext.fold(0) fold@{ count, element -> + val key = element.key + val collectElement = collectContext[key] + if (key !== Job) { + return@fold if (element !== collectElement) Int.MIN_VALUE + else count + 1 + } + + val collectJob = collectElement as Job? + val emissionParentJob = (element as Job).transitiveCoroutineParent(collectJob) + /* + * Things like + * ``` + * coroutineScope { + * launch { + * emit(1) + * } + * + * launch { + * emit(2) + * } + * } + * ``` + * are prohibited because 'emit' is not thread-safe by default. Use channelFlow instead if you need concurrent emission + * or want to switch context dynamically (e.g. with `withContext`). + * + * Note that collecting from another coroutine is allowed, e.g.: + * ``` + * coroutineScope { + * val channel = produce { + * collect { value -> + * send(value) + * } + * } + * channel.consumeEach { value -> + * emit(value) + * } + * } + * ``` + * is a completely valid. + */ + if (emissionParentJob !== collectJob) { + error( + "Flow invariant is violated: emission from another coroutine is detected (child of $emissionParentJob, expected child of $collectJob). " + + "FlowCollector is not thread-safe and concurrent emissions are prohibited. To mitigate this restriction please use 'flowChannel' builder instead of 'flow'" + ) + } + count + 1 + } + if (result != collectContextSize) { error( - "Flow invariant is violated: flow was collected in $collectContext, but emission happened in $emitContext. " + - "Please refer to 'flow' documentation or use 'flowOn' instead") + "Flow invariant is violated: flow was collected in $collectContext, but emission happened in $currentContext. " + + "Please refer to 'flow' documentation or use 'flowOn' instead" + ) } - collector.emit(value) + } + + private tailrec fun Job?.transitiveCoroutineParent(collectJob: Job?): Job? { + if (this === null) return null + if (this === collectJob) return this + if (this !is ScopeCoroutine<*>) return this + return parent.transitiveCoroutineParent(collectJob) } } diff --git a/kotlinx-coroutines-core/common/src/internal/Scopes.kt b/kotlinx-coroutines-core/common/src/internal/Scopes.kt index e424c9a2e8..3361694481 100644 --- a/kotlinx-coroutines-core/common/src/internal/Scopes.kt +++ b/kotlinx-coroutines-core/common/src/internal/Scopes.kt @@ -19,6 +19,8 @@ internal open class ScopeCoroutine( final override fun getStackTraceElement(): StackTraceElement? = null override val defaultResumeMode: Int get() = MODE_DIRECT + internal val parent: Job? get() = parentContext[Job] + override val cancelsParent: Boolean get() = false // it throws exception to parent instead of cancelling it diff --git a/kotlinx-coroutines-core/common/test/flow/FlowInvariantsTest.kt b/kotlinx-coroutines-core/common/test/flow/FlowInvariantsTest.kt index 0befdc2227..0659166557 100644 --- a/kotlinx-coroutines-core/common/test/flow/FlowInvariantsTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/FlowInvariantsTest.kt @@ -12,10 +12,9 @@ import kotlin.test.* class FlowInvariantsTest : TestBase() { @Test - fun testWithContextContract() = runTest { + fun testWithContextContract() = runTest({ it is IllegalStateException }) { flow { kotlinx.coroutines.withContext(NonCancellable) { - // This one cannot be prevented :( emit(1) } }.collect { @@ -34,6 +33,27 @@ class FlowInvariantsTest : TestBase() { } } + @Test + fun testCachedInvariantCheckResult() = runTest { + flow { + emit(1) + + try { + kotlinx.coroutines.withContext(NamedDispatchers("foo")) { + emit(1) + } + fail() + } catch (e: IllegalStateException) { + expect(2) + } + + emit(3) + }.collect { + expect(it) + } + finish(4) + } + @Test fun testWithNameContractViolated() = runTest({ it is IllegalStateException }) { flow { @@ -66,7 +86,7 @@ class FlowInvariantsTest : TestBase() { } @Test - fun testScopedJob() = runTest { + fun testScopedJob() = runTest({ it is IllegalStateException }) { flow { emit(1) }.buffer(EmptyCoroutineContext).collect { expect(1) } @@ -83,6 +103,87 @@ class FlowInvariantsTest : TestBase() { finish(2) } + @Test + fun testMergeViolation() = runTest { + fun Flow.merge(other: Flow): Flow = flow { + coroutineScope { + launch { + collect { value -> emit(value) } + } + other.collect { value -> emit(value) } + } + } + + fun Flow.trickyMerge(other: Flow): Flow = flow { + coroutineScope { + launch { + collect { value -> + coroutineScope { emit(value) } + } + } + other.collect { value -> emit(value) } + } + } + + val flow = flowOf(1) + assertFailsWith { flow.merge(flow).toList() } + assertFailsWith { flow.trickyMerge(flow).toList() } + } + + // TODO merge artifact + private fun channelFlow(bufferSize: Int = 16, @BuilderInference block: suspend ProducerScope.() -> Unit): Flow = + flow { + coroutineScope { + val channel = produce(capacity = bufferSize, block = block) + channel.consumeEach { value -> + emit(value) + } + } + } + + @Test + fun testNoMergeViolation() = runTest { + fun Flow.merge(other: Flow): Flow = channelFlow { + launch { + collect { value -> send(value) } + } + other.collect { value -> send(value) } + } + + fun Flow.trickyMerge(other: Flow): Flow = channelFlow { + coroutineScope { + launch { + collect { value -> + coroutineScope { send(value) } + } + } + other.collect { value -> send(value) } + } + } + + val flow = flowOf(1) + assertEquals(listOf(1, 1), flow.merge(flow).toList()) + assertEquals(listOf(1, 1), flow.trickyMerge(flow).toList()) + } + + @Test + fun testScopedCoroutineNoViolation() = runTest { + fun Flow.buffer(): Flow = flow { + coroutineScope { + val channel = produce { + collect { + send(it) + } + } + channel.consumeEach { + emit(it) + } + } + } + + assertEquals(listOf(1, 1), flowOf(1, 1).buffer().toList()) + } + private fun Flow.buffer(coroutineContext: CoroutineContext): Flow = flow { coroutineScope { val channel = Channel() diff --git a/kotlinx-coroutines-core/js/src/CoroutineContext.kt b/kotlinx-coroutines-core/js/src/CoroutineContext.kt index c5f884c7a1..87ed1f4ceb 100644 --- a/kotlinx-coroutines-core/js/src/CoroutineContext.kt +++ b/kotlinx-coroutines-core/js/src/CoroutineContext.kt @@ -47,6 +47,4 @@ public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): // No debugging facilities on JS internal actual inline fun withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T = block() internal actual fun Continuation<*>.toDebugString(): String = toString() -internal actual val CoroutineContext.coroutineName: String? get() = null // not supported on JS -@Suppress("NOTHING_TO_INLINE") -internal actual inline fun CoroutineContext.minusId(): CoroutineContext = this \ No newline at end of file +internal actual val CoroutineContext.coroutineName: String? get() = null // not supported on JS \ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/src/CoroutineContext.kt b/kotlinx-coroutines-core/jvm/src/CoroutineContext.kt index 7a52db7ef7..bd586d6e2a 100644 --- a/kotlinx-coroutines-core/jvm/src/CoroutineContext.kt +++ b/kotlinx-coroutines-core/jvm/src/CoroutineContext.kt @@ -79,9 +79,6 @@ internal actual val CoroutineContext.coroutineName: String? get() { return "$coroutineName#${coroutineId.id}" } -@Suppress("NOTHING_TO_INLINE") -internal actual inline fun CoroutineContext.minusId(): CoroutineContext = minusKey(CoroutineId) - private const val DEBUG_THREAD_NAME_SEPARATOR = " @" internal data class CoroutineId( diff --git a/kotlinx-coroutines-core/native/src/CoroutineContext.kt b/kotlinx-coroutines-core/native/src/CoroutineContext.kt index 0539526dbe..017a01c8f5 100644 --- a/kotlinx-coroutines-core/native/src/CoroutineContext.kt +++ b/kotlinx-coroutines-core/native/src/CoroutineContext.kt @@ -46,6 +46,4 @@ public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): // No debugging facilities on native internal actual inline fun withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T = block() internal actual fun Continuation<*>.toDebugString(): String = toString() -internal actual val CoroutineContext.coroutineName: String? get() = null // not supported on native -@Suppress("NOTHING_TO_INLINE") -internal actual inline fun CoroutineContext.minusId(): CoroutineContext = this \ No newline at end of file +internal actual val CoroutineContext.coroutineName: String? get() = null // not supported on native \ No newline at end of file