From 4ca36dd59d196c782e3e5fd09f5e676886025f1a Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Thu, 8 Feb 2024 10:37:12 +0100 Subject: [PATCH 1/3] Ensure that flow operators propagate the cancellation exceptions Before this change, it could happen that some size-limiting operators upstream swallowed the requests to limit the flow size emitted by the operators downstream. This could cause `onCompletion` calls between these operators to incorrectly report that the flow was not in fact limited by the downstream operators. Additionally, in the presence of additional size-limiting operators in the chain, `first` and `single` and their variants could exhibit incorrect behavior where emitting a value from `onCompletion` would overwrite their output. Fixes #4035 --- .../common/src/flow/internal/Combine.kt | 6 +- .../flow/internal/FlowExceptions.common.kt | 6 +- .../common/src/flow/operators/Limit.kt | 9 +-- .../test/flow/operators/OnCompletionTest.kt | 62 +++++++++++++++++++ .../src/flow/internal/FlowExceptions.kt | 2 +- .../jvm/src/flow/internal/FlowExceptions.kt | 2 +- .../src/flow/internal/FlowExceptions.kt | 2 +- 7 files changed, 76 insertions(+), 13 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt index eb82426760..cd764a5f61 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt @@ -102,7 +102,7 @@ internal fun zipImpl(flow: Flow, flow2: Flow, transform: sus val collectJob = Job() (second as SendChannel<*>).invokeOnClose { // Optimization to avoid AFE allocation when the other flow is done - if (collectJob.isActive) collectJob.cancel(AbortFlowException(this@unsafeFlow)) + if (collectJob.isActive) collectJob.cancel(AbortFlowException(collectJob)) } try { @@ -124,14 +124,14 @@ internal fun zipImpl(flow: Flow, flow2: Flow, transform: sus flow.collect { value -> withContextUndispatched(scopeContext, Unit, cnt) { val otherValue = second.receiveCatching().getOrElse { - throw it ?:AbortFlowException(this@unsafeFlow) + throw it ?:AbortFlowException(collectJob) } emit(transform(value, NULL.unbox(otherValue))) } } } } catch (e: AbortFlowException) { - e.checkOwnership(owner = this@unsafeFlow) + e.checkOwnership(owner = collectJob) } finally { second.cancel() } diff --git a/kotlinx-coroutines-core/common/src/flow/internal/FlowExceptions.common.kt b/kotlinx-coroutines-core/common/src/flow/internal/FlowExceptions.common.kt index 999e117e7c..827562de4b 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/FlowExceptions.common.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/FlowExceptions.common.kt @@ -9,11 +9,11 @@ import kotlinx.coroutines.flow.* * This exception can be safely ignored by non-terminal flow operator if and only if it was caught by its owner * (see usages of [checkOwnership]). */ -internal expect class AbortFlowException(owner: FlowCollector<*>) : CancellationException { - public val owner: FlowCollector<*> +internal expect class AbortFlowException(owner: Any) : CancellationException { + val owner: Any } -internal fun AbortFlowException.checkOwnership(owner: FlowCollector<*>) { +internal fun AbortFlowException.checkOwnership(owner: Any) { if (this.owner !== owner) throw this } diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Limit.kt b/kotlinx-coroutines-core/common/src/flow/operators/Limit.kt index 3d489aa946..d40a92d00b 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Limit.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Limit.kt @@ -46,6 +46,7 @@ public fun Flow.dropWhile(predicate: suspend (T) -> Boolean): Flow = f public fun Flow.take(count: Int): Flow { require(count > 0) { "Requested element count $count should be positive" } return flow { + val ownershipMarker = Any() var consumed = 0 try { collect { value -> @@ -56,18 +57,18 @@ public fun Flow.take(count: Int): Flow { if (++consumed < count) { return@collect emit(value) } else { - return@collect emitAbort(value) + return@collect emitAbort(value, ownershipMarker) } } } catch (e: AbortFlowException) { - e.checkOwnership(owner = this) + e.checkOwnership(owner = ownershipMarker) } } } -private suspend fun FlowCollector.emitAbort(value: T) { +private suspend fun FlowCollector.emitAbort(value: T, ownershipMarker: Any) { emit(value) - throw AbortFlowException(this) + throw AbortFlowException(ownershipMarker) } /** diff --git a/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt index 8fc9285f5d..38b3867c51 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt @@ -308,4 +308,66 @@ class OnCompletionTest : TestBase() { .take(1) .collect() } + + /** + * Tests that the operators that are used to limit the flow (like [take] and [zip]) faithfully propagate the + * cancellation exception to the original owner. + */ + @Test + fun testOnCompletionBetweenLimitingOperators() = runTest { + // `zip` doesn't eat the exception thrown by `take`: + flowOf(1, 2, 3) + .zip(flowOf(4, 5)) { a, b -> a + b } + .onCompletion { + expect(2) + assertNotNull(it) + } + .take(1) + .collect { + expect(1) + } + + // `take` doesn't eat the exception thrown by `zip`: + flowOf(1, 2, 3) + .take(2) + .onCompletion { + expect(4) + assertNotNull(it) + } + .zip(flowOf(4)) { a, b -> a + b } + .collect { + expect(3) + } + + // `take` doesn't eat the exception thrown by `first`: + flowOf(1, 2, 3) + .take(2) + .onCompletion { + expect(5) + assertNotNull(it) + } + .first() + + // `zip` doesn't eat the exception thrown by `first`: + flowOf(1, 2, 3) + .zip(flowOf(4, 5)) { a, b -> a + b } + .onCompletion { + expect(6) + assertNotNull(it) + } + .first() + + finish(7) + } + + /** + * Tests that emitting new elements after completion doesn't overwrite the old elements. + */ + @Test + fun testEmittingElementsAfterCancellation() = runTest { + assertEquals(1, flowOf(1, 2, 3) + .take(100) + .onCompletion { emit(4) } + .first()) + } } diff --git a/kotlinx-coroutines-core/jsAndWasmShared/src/flow/internal/FlowExceptions.kt b/kotlinx-coroutines-core/jsAndWasmShared/src/flow/internal/FlowExceptions.kt index 722911127e..0e780f53a0 100644 --- a/kotlinx-coroutines-core/jsAndWasmShared/src/flow/internal/FlowExceptions.kt +++ b/kotlinx-coroutines-core/jsAndWasmShared/src/flow/internal/FlowExceptions.kt @@ -4,6 +4,6 @@ import kotlinx.coroutines.* import kotlinx.coroutines.flow.* internal actual class AbortFlowException actual constructor( - actual val owner: FlowCollector<*> + actual val owner: Any ) : CancellationException("Flow was aborted, no more elements needed") internal actual class ChildCancelledException : CancellationException("Child of the scoped flow was cancelled") diff --git a/kotlinx-coroutines-core/jvm/src/flow/internal/FlowExceptions.kt b/kotlinx-coroutines-core/jvm/src/flow/internal/FlowExceptions.kt index 5ec064c35b..e6f3453359 100644 --- a/kotlinx-coroutines-core/jvm/src/flow/internal/FlowExceptions.kt +++ b/kotlinx-coroutines-core/jvm/src/flow/internal/FlowExceptions.kt @@ -4,7 +4,7 @@ import kotlinx.coroutines.* import kotlinx.coroutines.flow.* internal actual class AbortFlowException actual constructor( - @JvmField @Transient actual val owner: FlowCollector<*> + @JvmField @Transient actual val owner: Any ) : CancellationException("Flow was aborted, no more elements needed") { override fun fillInStackTrace(): Throwable { diff --git a/kotlinx-coroutines-core/native/src/flow/internal/FlowExceptions.kt b/kotlinx-coroutines-core/native/src/flow/internal/FlowExceptions.kt index ebb47015ec..1109b15fe4 100644 --- a/kotlinx-coroutines-core/native/src/flow/internal/FlowExceptions.kt +++ b/kotlinx-coroutines-core/native/src/flow/internal/FlowExceptions.kt @@ -4,7 +4,7 @@ import kotlinx.coroutines.* import kotlinx.coroutines.flow.* internal actual class AbortFlowException actual constructor( - actual val owner: FlowCollector<*> + actual val owner: Any ) : CancellationException("Flow was aborted, no more elements needed") internal actual class ChildCancelledException : CancellationException("Child of the scoped flow was cancelled") From 18459905f97232d4bc2f99167312fe4b20791ecf Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Wed, 14 Feb 2024 14:20:41 +0100 Subject: [PATCH 2/3] Document AbortFlowException more thoroughly --- .../common/src/flow/internal/FlowExceptions.common.kt | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/internal/FlowExceptions.common.kt b/kotlinx-coroutines-core/common/src/flow/internal/FlowExceptions.common.kt index 827562de4b..628296c3e2 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/FlowExceptions.common.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/FlowExceptions.common.kt @@ -4,10 +4,11 @@ import kotlinx.coroutines.* import kotlinx.coroutines.flow.* /** - * This exception is thrown when operator need no more elements from the flow. - * This exception should never escape outside of operator's implementation. + * This exception is thrown when an operator needs no more elements from the flow. + * The operator should never allow this exception to be thrown past its own boundary. * This exception can be safely ignored by non-terminal flow operator if and only if it was caught by its owner * (see usages of [checkOwnership]). + * Therefore, the [owner] parameter must be unique for every invocation of every operator. */ internal expect class AbortFlowException(owner: Any) : CancellationException { val owner: Any From a1bacb83d1686dfebe7e80c32cfd4e35591cbfa0 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Wed, 14 Feb 2024 14:21:26 +0100 Subject: [PATCH 3/3] Test exception stealing between instances of one operator --- .../common/src/flow/internal/Combine.kt | 2 +- .../test/flow/operators/OnCompletionTest.kt | 26 ++++++++++++++++++- 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt index cd764a5f61..60fcc8e04c 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt @@ -124,7 +124,7 @@ internal fun zipImpl(flow: Flow, flow2: Flow, transform: sus flow.collect { value -> withContextUndispatched(scopeContext, Unit, cnt) { val otherValue = second.receiveCatching().getOrElse { - throw it ?:AbortFlowException(collectJob) + throw it ?: AbortFlowException(collectJob) } emit(transform(value, NULL.unbox(otherValue))) } diff --git a/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt index 38b3867c51..14ee53164d 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt @@ -357,7 +357,31 @@ class OnCompletionTest : TestBase() { } .first() - finish(7) + // `take` doesn't eat the exception thrown by another `take`: + flowOf(1, 2, 3) + .take(2) + .onCompletion { + expect(8) + assertNotNull(it) + } + .take(1) + .collect { + expect(7) + } + + // `zip` doesn't eat the exception thrown by another `zip`: + flowOf(1, 2, 3) + .zip(flowOf(4, 5)) { a, b -> a + b } + .onCompletion { + expect(10) + assertNotNull(it) + } + .zip(flowOf(6)) { a, b -> a + b } + .collect { + expect(9) + } + + finish(11) } /**