Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure that flow operators propagate the cancellation exceptions #4038

Merged
merged 3 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions kotlinx-coroutines-core/common/src/flow/internal/Combine.kt
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ internal fun <T1, T2, R> zipImpl(flow: Flow<T1>, flow2: Flow<T2>, transform: sus
val collectJob = Job()
(second as SendChannel<*>).invokeOnClose {
// Optimization to avoid AFE allocation when the other flow is done
if (collectJob.isActive) collectJob.cancel(AbortFlowException(this@unsafeFlow))
if (collectJob.isActive) collectJob.cancel(AbortFlowException(collectJob))
}

try {
Expand All @@ -124,14 +124,14 @@ internal fun <T1, T2, R> zipImpl(flow: Flow<T1>, flow2: Flow<T2>, transform: sus
flow.collect { value ->
withContextUndispatched(scopeContext, Unit, cnt) {
val otherValue = second.receiveCatching().getOrElse {
throw it ?:AbortFlowException(this@unsafeFlow)
throw it ?:AbortFlowException(collectJob)
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
}
emit(transform(value, NULL.unbox(otherValue)))
}
}
}
} catch (e: AbortFlowException) {
e.checkOwnership(owner = this@unsafeFlow)
e.checkOwnership(owner = collectJob)
} finally {
second.cancel()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import kotlinx.coroutines.flow.*
* This exception can be safely ignored by non-terminal flow operator if and only if it was caught by its owner
* (see usages of [checkOwnership]).
*/
internal expect class AbortFlowException(owner: FlowCollector<*>) : CancellationException {
public val owner: FlowCollector<*>
internal expect class AbortFlowException(owner: Any) : CancellationException {
val owner: Any
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
}

internal fun AbortFlowException.checkOwnership(owner: FlowCollector<*>) {
internal fun AbortFlowException.checkOwnership(owner: Any) {
if (this.owner !== owner) throw this
}

Expand Down
9 changes: 5 additions & 4 deletions kotlinx-coroutines-core/common/src/flow/operators/Limit.kt
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public fun <T> Flow<T>.dropWhile(predicate: suspend (T) -> Boolean): Flow<T> = f
public fun <T> Flow<T>.take(count: Int): Flow<T> {
require(count > 0) { "Requested element count $count should be positive" }
return flow {
val ownershipMarker = Any()
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
var consumed = 0
try {
collect { value ->
Expand All @@ -56,18 +57,18 @@ public fun <T> Flow<T>.take(count: Int): Flow<T> {
if (++consumed < count) {
return@collect emit(value)
} else {
return@collect emitAbort(value)
return@collect emitAbort(value, ownershipMarker)
}
}
} catch (e: AbortFlowException) {
e.checkOwnership(owner = this)
e.checkOwnership(owner = ownershipMarker)
}
}
}

private suspend fun <T> FlowCollector<T>.emitAbort(value: T) {
private suspend fun <T> FlowCollector<T>.emitAbort(value: T, ownershipMarker: Any) {
emit(value)
throw AbortFlowException(this)
throw AbortFlowException(ownershipMarker)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,4 +308,66 @@ class OnCompletionTest : TestBase() {
.take(1)
.collect()
}

/**
* Tests that the operators that are used to limit the flow (like [take] and [zip]) faithfully propagate the
* cancellation exception to the original owner.
*/
@Test
fun testOnCompletionBetweenLimitingOperators() = runTest {
// `zip` doesn't eat the exception thrown by `take`:
flowOf(1, 2, 3)
.zip(flowOf(4, 5)) { a, b -> a + b }
.onCompletion {
expect(2)
assertNotNull(it)
}
.take(1)
.collect {
expect(1)
}

// `take` doesn't eat the exception thrown by `zip`:
flowOf(1, 2, 3)
.take(2)
.onCompletion {
expect(4)
assertNotNull(it)
}
.zip(flowOf(4)) { a, b -> a + b }
.collect {
expect(3)
}

// `take` doesn't eat the exception thrown by `first`:
flowOf(1, 2, 3)
.take(2)
.onCompletion {
expect(5)
assertNotNull(it)
}
.first()

// `zip` doesn't eat the exception thrown by `first`:
flowOf(1, 2, 3)
.zip(flowOf(4, 5)) { a, b -> a + b }
.onCompletion {
expect(6)
assertNotNull(it)
}
.first()

finish(7)
}

/**
* Tests that emitting new elements after completion doesn't overwrite the old elements.
*/
@Test
fun testEmittingElementsAfterCancellation() = runTest {
assertEquals(1, flowOf(1, 2, 3)
.take(100)
.onCompletion { emit(4) }
.first())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

internal actual class AbortFlowException actual constructor(
actual val owner: FlowCollector<*>
actual val owner: Any
) : CancellationException("Flow was aborted, no more elements needed")
internal actual class ChildCancelledException : CancellationException("Child of the scoped flow was cancelled")
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

internal actual class AbortFlowException actual constructor(
@JvmField @Transient actual val owner: FlowCollector<*>
@JvmField @Transient actual val owner: Any
) : CancellationException("Flow was aborted, no more elements needed") {

override fun fillInStackTrace(): Throwable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

internal actual class AbortFlowException actual constructor(
actual val owner: FlowCollector<*>
actual val owner: Any
) : CancellationException("Flow was aborted, no more elements needed")
internal actual class ChildCancelledException : CancellationException("Child of the scoped flow was cancelled")