From c02c44c0ce1e68554af31a0bb9f48d12bc33b8af Mon Sep 17 00:00:00 2001 From: Rick Busarow Date: Mon, 16 Dec 2019 17:38:51 -0600 Subject: [PATCH 1/2] shareIn and cache operators Fixes #1261 --- .../common/src/flow/internal/SharedFlow.kt | 105 +++++++++ .../common/src/flow/operators/Share.kt | 166 ++++++++++++++ .../common/src/internal/CircularArray.kt | 89 ++++++++ .../common/test/flow/internal/CacheTest.kt | 81 +++++++ .../common/test/flow/internal/ShareInTest.kt | 205 ++++++++++++++++++ 5 files changed, 646 insertions(+) create mode 100644 kotlinx-coroutines-core/common/src/flow/internal/SharedFlow.kt create mode 100644 kotlinx-coroutines-core/common/src/flow/operators/Share.kt create mode 100644 kotlinx-coroutines-core/common/src/internal/CircularArray.kt create mode 100644 kotlinx-coroutines-core/common/test/flow/internal/CacheTest.kt create mode 100644 kotlinx-coroutines-core/common/test/flow/internal/ShareInTest.kt diff --git a/kotlinx-coroutines-core/common/src/flow/internal/SharedFlow.kt b/kotlinx-coroutines-core/common/src/flow/internal/SharedFlow.kt new file mode 100644 index 0000000000..0e2044b74f --- /dev/null +++ b/kotlinx-coroutines-core/common/src/flow/internal/SharedFlow.kt @@ -0,0 +1,105 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow.internal + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.internal.* +import kotlinx.coroutines.sync.* + +internal fun Flow.asCachedFlow( + cacheHistory: Int +): Flow { + + require(cacheHistory > 0) { "cacheHistory parameter must be greater than 0, but was $cacheHistory" } + + val cache = CircularArray(cacheHistory) + + return onEach { value -> + // While flowing, also record all values in the cache. + cache.add(value) + }.onStart { + // Before emitting any values in sourceFlow, + // emit any cached values starting with the oldest. + cache.forEach { emit(it) } + } +} + +internal fun Flow.asSharedFlow( + scope: CoroutineScope, cacheHistory: Int +): Flow = SharedFlow(this, scope, cacheHistory) + +/** + * An auto-resetting [broadcast] flow. It tracks the number of active collectors, and automatically resets when + * the number reaches 0. + * + * `SharedFlow` has an optional [cache], where the latest _n_ elements emitted by the source Flow will be replayed to + * late collectors. + * + * ### Upon reset + * 1) The underlying [BroadcastChannel] is closed. A new BroadcastChannel will be created when a new collector starts. + * 2) The cache is reset. New collectors will not receive values from before the reset, but will generate a new cache. + */ +internal class SharedFlow( + private val sourceFlow: Flow, + private val scope: CoroutineScope, + private val cacheHistory: Int +) : Flow { + + private var refCount = 0 + private var cache = CircularArray(cacheHistory) + private val mutex = Mutex(false) + + init { + require(cacheHistory >= 0) { "cacheHistory parameter must be at least 0, but was $cacheHistory" } + } + + public override suspend fun collect( + collector: FlowCollector + ) = collector.emitAll(createFlow()) + + // Replay happens per new collector, if cacheHistory > 0. + private suspend fun createFlow(): Flow = getChannel() + .asFlow() + .replayIfNeeded() + .onCompletion { onCollectEnd() } + + // lazy holder for the BroadcastChannel, which is reset whenever all collection ends + private var lazyChannelRef = createLazyChannel() + + // must be lazy so that the broadcast doesn't begin immediately after a reset + private fun createLazyChannel() = lazy(LazyThreadSafetyMode.NONE) { + sourceFlow.cacheIfNeeded() + .broadcastIn(scope) + } + + private fun Flow.replayIfNeeded(): Flow = if (cacheHistory > 0) { + onStart { + cache.forEach { + emit(it) + } + } + } else this + + private fun Flow.cacheIfNeeded(): Flow = if (cacheHistory > 0) { + onEach { value -> + // While flowing, also record all values in the cache. + cache.add(value) + } + } else this + + private fun reset() { + cache = CircularArray(cacheHistory) + lazyChannelRef = createLazyChannel() + } + + private suspend fun onCollectEnd() = mutex.withLock { if (--refCount == 0) reset() } + private suspend fun getChannel(): BroadcastChannel = mutex.withLock { + refCount++ + lazyChannelRef.value + } + +} diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Share.kt b/kotlinx-coroutines-core/common/src/flow/operators/Share.kt new file mode 100644 index 0000000000..e84123041e --- /dev/null +++ b/kotlinx-coroutines-core/common/src/flow/operators/Share.kt @@ -0,0 +1,166 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +@file:JvmMultifileClass +@file:JvmName("FlowKt") + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.flow.internal.* +import kotlin.jvm.* + +/** + * A "cached" [Flow] which will record the last [history] collected values. + * + * When a collector begins collecting after values have already been recorded, + * those values will be collected *before* values from the receiver [Flow] are collected. + * + * example: + * ```Kotlin + * val ints = flowOf(1, 2, 3, 4).cache(2) // cache the last 2 values + * + * ints.take(4).collect { ... } // 4 values are emitted, but also recorded. The last 2 remain. + * + * ints.collect { ... } // collects [3, 4, 1, 2, 3, 4] + * ``` + * + * Throws [IllegalArgumentException] if size parameter is not greater than 0 + * + * @param history the number of items to keep in the [Flow]'s history -- must be greater than 0 + */ +@FlowPreview +public fun Flow.cache(history: Int): Flow = asCachedFlow(history) + +/** + * Creates a [broadcast] coroutine which collects the [Flow] receiver and shares with multiple collectors. + * + * A [BroadcastChannel] with [default][Channel.Factory.BUFFERED] buffer size is created. + * Use [buffer] operator on the flow before calling `shareIn` to specify a value other than + * default and to control what happens when data is produced faster than it is consumed, + * that is to control back-pressure behavior. + * + * Concurrent collectors will all collect from a single [broadcast] flow. This flow will be cancelled automatically + * when it is no longer being collected, and the underlying channel will be closed. + * + * If a new collector is added after the channel has been closed, a new channel will be created. + * + * By default, this flow is effectively **stateless** in that collectors will only receive values emitted after collection begins. + * + * example: + * + * ``` + * val sourceFlow = flowOf(1, 2, 3, 4, 5) + * .onStart { println("start source") } + * .onEach { println("emit $it") } + * .onCompletion { println("complete source") } + * .shareIn(this) + * + * val a = async { sourceFlow.toList() } + * val b = async { sourceFlow.toList() } // collect concurrently + * + * println(a.await()) + * println(b.await()) + * + * println("** break **") + * + * println(sourceFlow.toList()) + * + * prints: + * + * start source + * emit 1 + * emit 2 + * emit 3 + * emit 4 + * emit 5 + * complete source + * [1, 2, 3, 4, 5] + * [1, 2, 3, 4, 5] + * ** break ** + * start source + * emit 1 + * emit 2 + * emit 3 + * emit 4 + * emit 5 + * complete source + * [1, 2, 3, 4, 5] + * + * ``` + * ### Caching + * + * When a shared flow is cached, the values are recorded as they are emitted from the source Flow. + * They are then replayed for each new subscriber. + * + * When a shared flow is reset, the cached values are cleared. + * + * example: + * + * ``` + * val sourceFlow = flowOf(1, 2, 3, 4, 5) + * .onEach { + * delay(50) + * println("emit $it") + * }.shareIn(this, 1) + * + * val a = async { sourceFlow.toList() } + * delay(125) + * val b = async { sourceFlow.toList() } // begin collecting after "emit 3" + * + * println(a.await()) + * println(b.await()) + * + * println("** break **") + * + * println(sourceFlow.toList()) // the shared flow has been reset, so the cached values are cleared + * + * prints: + * + * emit 1 + * emit 2 + * emit 3 + * emit 4 + * emit 5 + * [1, 2, 3, 4, 5] + * [2, 3, 4, 5] + * ** break ** + * emit 1 + * emit 2 + * emit 3 + * emit 4 + * emit 5 + * [1, 2, 3, 4, 5] + * + * ``` + * + * In order to have cached values persist across resets, use `cache(n)` before `shareIn(...)`. + * + * example: + * + * ``` + * // resets cache whenever the Flow is reset + * flowOf(1, 2, 3).shareIn(myScope, 3) + * + * // persists cache across resets + * flowOf(1, 2, 3).cached(3).shareIn(myScope) + * ``` + * + * ### Cancellation semantics + * 1) Flow consumer is cancelled when the original channel is cancelled. + * 2) Flow consumer completes normally when the original channel completes (~is closed) normally. + * 3) Collection is cancelled when the (scope)[CoroutineScope] parameter is cancelled, + * thereby ending the consumer when it has run out of elements. + * 4) If the flow consumer fails with an exception, subscription is cancelled. + * + * @param scope The [CoroutineScope] used to create the [broadcast] coroutine. Cancellation of this scope + * will close the underlying [BroadcastChannel]. + * @param cacheHistory (default = 0). Any value greater than zero will add a [cache] to the shared Flow. + * + */ +@FlowPreview +fun Flow.shareIn( + scope: CoroutineScope, cacheHistory: Int = 0 +): Flow = asSharedFlow(scope, cacheHistory) diff --git a/kotlinx-coroutines-core/common/src/internal/CircularArray.kt b/kotlinx-coroutines-core/common/src/internal/CircularArray.kt new file mode 100644 index 0000000000..e02bb9e308 --- /dev/null +++ b/kotlinx-coroutines-core/common/src/internal/CircularArray.kt @@ -0,0 +1,89 @@ +package kotlinx.coroutines.internal + +import kotlinx.coroutines.* + + +/** + * CircularArray implementation which will hold the latest of up to `size` elements. + * + * After the cache has been filled, all further additions will overwrite the least recent value. + * + * @param size the maximum number of elements to store in the array + */ +internal class CircularArray(size: Int) : Iterable { + + private val array: Array = arrayOfNulls(size) + private var count: Int = 0 + private var tail: Int = -1 + + /** + * Adds [item] to the [CircularArray]. + * + * If the `CircularArray` has not yet been filled, + * `item` will simply be added to the next available slot. + * + * If the `CircularArray` has already been filled, + * `item` will replace the oldest item already in the array. + * + * example: + * ``` + * val ca = CircularArray(3) + * + * ca.add(0) // ca contents : [0, null, null] + * ca.add(1) // ca contents : [0, 1, null] + * ca.add(2) // ca contents : [0, 1, 2] + * // overwrite the oldest value + * ca.add(3) // ca contents : [3, 1, 2] + * ``` + */ + public fun add(item: T) { + tail = (tail + 1) % array.size + array[tail] = item + if (count < array.size) count++ + } + + /** + * Iterates over the [CircularArray]. + * + * Order is always first-in-first-out, with the oldest item being used first. + * + * example: + * ``` + * val ca = CircularArray(3) + * + * ca.add(0) // ca contents : [0, null, null] + * ca.add(1) // ca contents : [0, 1, null] + * ca.add(2) // ca contents : [0, 1, 2] + * // overwrite the oldest value + * ca.add(3) // ca contents : [3, 1, 2] + * + * ca.forEach { ... } // order : [1, 2, 3] + * ``` + */ + public override fun iterator(): Iterator = object : Iterator { + private val arraySnapshot = array.copyOf() + private val tailSnapshot = tail + + private var _index = 0 + + private val head: Int + get() = when (count) { + arraySnapshot.size -> (tailSnapshot + 1) % count + else -> 0 + } + + @Suppress("UNCHECKED_CAST") + private fun get(index: Int): T = when (count) { + arraySnapshot.size -> arraySnapshot[(head + index) % arraySnapshot.size] as T + else -> arraySnapshot[index] as T + } + + override fun hasNext(): Boolean = _index < count + override fun next(): T = get(_index++) + + } + + public override fun toString(): String = "$classSimpleName[array=${contentToString()}]" + + private fun contentToString(): String = joinToString { "$it" } +} diff --git a/kotlinx-coroutines-core/common/test/flow/internal/CacheTest.kt b/kotlinx-coroutines-core/common/test/flow/internal/CacheTest.kt new file mode 100644 index 0000000000..57797d6567 --- /dev/null +++ b/kotlinx-coroutines-core/common/test/flow/internal/CacheTest.kt @@ -0,0 +1,81 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow.internal + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import kotlin.test.* + +class CacheTest : TestBase() { + + @Test + fun illegalHistorySizes_throwException() = runTest { + + assertFailsWith { flowOf("a").cache(-3) } + assertFailsWith { flowOf("a").cache(-2) } + assertFailsWith { flowOf("a").cache(-1) } + assertFailsWith { flowOf("a").cache(0) } + } + + @Test + fun firstCollection_doesNotEmitCache() = runTest { + + val list = listOf(1, 2, 3, 4, 5) + + val flow = list.asFlow().cache(2) + + val collected = flow.toList() + + assertEquals(list, collected) + } + + @Test + fun secondCollection_receivesCacheFirst() = runTest { + + val list = listOf(1, 2, 3, 4, 5) + + val flow = list.asFlow().cache(2) + + flow.collect() // collect [1, 2, 3, 4, 5], cache = [4, 5] + + val collect2 = flow.toList() + + assertEquals(listOf(4, 5, 1, 2, 3, 4, 5), collect2) + } + + @Test + fun thirdCollection_getsUpdatedCache() = runTest { + + val list = listOf(1, 2, 3, 4, 5) + + val flow = list.asFlow().cache(2) + + flow.take(4).collect() // collect [1, 2, 3, 4], cache = [3, 4] + flow.take(3).collect() // collect [3, 4, 1], cache = [4, 1] + + val collect3 = flow.toList() + + assertEquals(listOf(4, 1, 1, 2, 3, 4, 5), collect3) + } + + @Test + fun largeCache_buildsOverMultipleCollectors() = runTest { + + val list = listOf(1, 2, 3, 4, 5) + + val flow = list.asFlow().cache(8) + + // collect twice to generate the cache + flow.take(4).collect() // collect [1, 2, 3, 4], cache = [1, 2, 3, 4] + flow.collect() // collect [1, 2, 3, 4, 1, 2, 3, 4, 5], cache = [2, 3, 4, 1, 2, 3, 4, 5] + + val collect3 = flow.toList() + + val expected = listOf(2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5) + + assertEquals(expected, collect3) + } + +} diff --git a/kotlinx-coroutines-core/common/test/flow/internal/ShareInTest.kt b/kotlinx-coroutines-core/common/test/flow/internal/ShareInTest.kt new file mode 100644 index 0000000000..04140a420e --- /dev/null +++ b/kotlinx-coroutines-core/common/test/flow/internal/ShareInTest.kt @@ -0,0 +1,205 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow.internal + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.sync.* +import kotlin.test.* + +class ShareInTest : TestBase() { + + @Test + fun oneConsumer_completesSource() = runTest { + + var startInvocations = 0 + var completeInvocations = 0 + + val flow = flowOf(1, 2, 3, 4, 5) + .onStart { startInvocations++ } + .onCompletion { completeInvocations++ } + .shareIn(this) + + val one = flow.toList() + + assertEquals(listOf(1, 2, 3, 4, 5), one) + + assertEquals(1, startInvocations) + assertEquals(1, completeInvocations) + + } + + @Test + fun sequentialConsumers_completeSourceEachTime() = runTest { + + var startInvocations = 0 + var completeInvocations = 0 + + val flow = flowOf(1, 2, 3, 4, 5) + .onStart { startInvocations++ } + .onCompletion { completeInvocations++ } + .shareIn(this) + + val one = flow.toList() + val two = flow.toList() + + assertEquals(listOf(1, 2, 3, 4, 5), one) + assertEquals(listOf(1, 2, 3, 4, 5), two) + + assertEquals(2, startInvocations) + assertEquals(2, completeInvocations) + + } + + @Test + fun concurrentConsumers_shareOneSource() = runTest { + + var startInvocations = 0 + var completeInvocations = 0 + + val lock = Channel() + + val flow = flowOf(1, 2, 3, 4, 5) + .onStart { startInvocations++ } + .onCompletion { completeInvocations++ } + .shareIn(this) + + val one = async { + flow.onEach { + lock.receive() + lock.send(Unit) + } + .toList() + } + + val two = async { + flow.onEach { + lock.send(Unit) + lock.receive() + } + .toList() + } + + assertEquals(listOf(1, 2, 3, 4, 5), one.await()) + assertEquals(listOf(1, 2, 3, 4, 5), two.await()) + + assertEquals(1, startInvocations) + assertEquals(1, completeInvocations) + } + + @Test + fun lateConsumer_onlyGetsNewValues() = runTest { + + val lock = BroadcastChannel(1) + + val sharedLock = lock.openSubscription() + val oneLock = lock.openSubscription() + + val flow = flowOf(1, 2, 3, 4, 5) + .onEach { sharedLock.receive() } + .shareIn(this) + + val one = async { + flow.onEach { oneLock.receive() } + .toList() + } + + lock.send(Unit) // emit(1) + lock.send(Unit) // emit(2) + + val two = async { + + lock.send(Unit) // emit(3) after this coroutine has started + + flow.onEach { + lock.send(Unit) + } + .toList() + } + + assertEquals(listOf(1, 2, 3, 4, 5), one.await()) + assertEquals(listOf(3, 4, 5), two.await()) + } + + @Test + fun cache_replaysForLateConsumers() = runTest { + + val sourceLock = Mutex(true) + val collectorLock = Mutex(true) + + val flow = flowOf(1, 2, 3, 4, 5) + .onEach { if (it == 4) sourceLock.withLock { } } // wait for second consumer to begin before continuing + .shareIn(this, 2) + + val one = async { + flow.onEach { if (it == 2) collectorLock.unlock() } + .toList() + } + + val two = async { + + collectorLock.withLock { + flow.onEach { if (it == 3) sourceLock.unlock() } + .toList() + } + } + + assertEquals(listOf(1, 2, 3, 4, 5), one.await()) + assertEquals(listOf(2, 3, 4, 5), two.await()) + + } + + @Test + fun refCountOfZero_resetsCache() = runTest { + + val flow = flowOf(1, 2, 3, 4, 5) + .shareIn(this, 2) + + val collect1 = flow.toList() + + assertEquals(listOf(1, 2, 3, 4, 5), collect1) + + val collect2 = flow.toList() + + assertEquals(listOf(1, 2, 3, 4, 5), collect2) + + } + + @Test + fun closedCoroutineScope_emitsRemainingValuesToSlowCollectors() = runTest({ + it is JobCancellationException + }) { + + val scope = CoroutineScope(Job()) + + val sourceLock = Channel() + val collectorLock = Channel() + + val sourceFlow = flowOf(1, 2, 3, 4, 5) + .onEach { + if (it == 4) { + sourceLock.send(Unit) + hang { } + } + } + .shareIn(scope) + + val listDeferred = async { + sourceFlow.onEach { + if (it == 2) { + collectorLock.receive() + } + }.toList() + } + + sourceLock.receive() + scope.cancel() + collectorLock.send(Unit) + + assertEquals(listOf(1, 2, 3, 4), listDeferred.await()) + } + +} From 0cace2ae827f44a325ade8a952a119bfa3e79f7d Mon Sep 17 00:00:00 2001 From: Rick Busarow Date: Sun, 12 Jan 2020 21:29:31 -0600 Subject: [PATCH 2/2] cancel SharedFlow's internal BroadcastChannel before resetting it refCount reaches 0 --- .../common/src/flow/internal/SharedFlow.kt | 16 ++-- .../common/test/flow/internal/ShareInTest.kt | 87 +++++++++++++------ 2 files changed, 71 insertions(+), 32 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/internal/SharedFlow.kt b/kotlinx-coroutines-core/common/src/flow/internal/SharedFlow.kt index 0e2044b74f..8117d66a75 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/SharedFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/SharedFlow.kt @@ -67,6 +67,11 @@ internal class SharedFlow( .replayIfNeeded() .onCompletion { onCollectEnd() } + private suspend fun getChannel(): BroadcastChannel = mutex.withLock { + refCount++ + lazyChannelRef.value + } + // lazy holder for the BroadcastChannel, which is reset whenever all collection ends private var lazyChannelRef = createLazyChannel() @@ -91,15 +96,12 @@ internal class SharedFlow( } } else this + private suspend fun onCollectEnd() = mutex.withLock { if (--refCount == 0) reset() } + private fun reset() { cache = CircularArray(cacheHistory) - lazyChannelRef = createLazyChannel() - } - private suspend fun onCollectEnd() = mutex.withLock { if (--refCount == 0) reset() } - private suspend fun getChannel(): BroadcastChannel = mutex.withLock { - refCount++ - lazyChannelRef.value + lazyChannelRef.value.cancel() + lazyChannelRef = createLazyChannel() } - } diff --git a/kotlinx-coroutines-core/common/test/flow/internal/ShareInTest.kt b/kotlinx-coroutines-core/common/test/flow/internal/ShareInTest.kt index 04140a420e..e3400fedb5 100644 --- a/kotlinx-coroutines-core/common/test/flow/internal/ShareInTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/internal/ShareInTest.kt @@ -168,38 +168,75 @@ class ShareInTest : TestBase() { } + @Test - fun closedCoroutineScope_emitsRemainingValuesToSlowCollectors() = runTest({ - it is JobCancellationException - }) { + fun refCountOfZero_cancelsSourceFlow() = runTest { - val scope = CoroutineScope(Job()) + val sourceBroadcastChannel = BroadcastChannel(1) - val sourceLock = Channel() - val collectorLock = Channel() + val sourceFlow = sourceBroadcastChannel.asFlow() - val sourceFlow = flowOf(1, 2, 3, 4, 5) - .onEach { - if (it == 4) { - sourceLock.send(Unit) - hang { } - } - } - .shareIn(scope) - - val listDeferred = async { - sourceFlow.onEach { - if (it == 2) { - collectorLock.receive() - } - }.toList() + val sharedFlow = sourceFlow.shareIn(this) + + /* + first sharing session begins + */ + val listOneDeferred = async(start = CoroutineStart.UNDISPATCHED) { + expect(1) + sharedFlow + .take(1) + .toList() } - sourceLock.receive() - scope.cancel() - collectorLock.send(Unit) + expect(2) + yield() // ensure that the "listOneDeferred" async has begun collection before sending + expect(3) + + sourceBroadcastChannel.send(1) + + assertEquals(listOf(1), listOneDeferred.await()) + + val newReceiveChannel = sourceBroadcastChannel.openSubscription() + + val sendList = listOf(2, 3, 4, 5) + + /* + second sharing session begins + */ + val listTwoDeferred = async(start = CoroutineStart.UNDISPATCHED) { + expect(4) + sharedFlow.take(sendList.size) + .toList() + } + + val listThreeDeferred = async { + newReceiveChannel.consumeAsFlow() + .shareIn(this) + .take(sendList.size) + .toList() + } + + assertEquals(false, sourceBroadcastChannel.isClosedForSend) + + expect(5) + yield() // ensure that the "listTwoDeferred" async has begun collection before sending + expect(6) + + sendList.forEach { + sourceBroadcastChannel.send(it) + } + + expect(7) + + // Reaching here means that the sends aren't suspending, + // which means that the internal channel created during the first sharing session was properly closed, + // else there would be deadlock. + assertEquals(sendList, listTwoDeferred.await()) + assertEquals(sendList, listThreeDeferred.await()) + + assertEquals(false, sourceBroadcastChannel.isClosedForSend) - assertEquals(listOf(1, 2, 3, 4), listDeferred.await()) + finish(8) } }