Skip to content

Commit

Permalink
Upgrade coroutines dep to 1.3.0-M1.
Browse files Browse the repository at this point in the history
 - Breakage in `CoroutineWorkflow` was caused by Kotlin/kotlinx.coroutines#1158.
 - Breakage in `ReactorAsWorkflowIntegrationTest` was probably caused by Kotlin/kotlinx.coroutines#1239.
  • Loading branch information
zach-klippenstein committed Jun 13, 2019
1 parent 8f60b8e commit 8baace0
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 12 deletions.
2 changes: 1 addition & 1 deletion kotlin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ buildscript {
'intellijAnnotations': '13.0',
'junit': '4.12',
'kotlin': '1.3.21',
'kotlinCoroutines': '1.2.1',
'kotlinCoroutines': '1.3.0-M1',
'ktlintPlugin': '5.1.0',
'mavenPublishPlugin': '0.6.0',
'mockito': '2.7.5',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.squareup.workflow.legacy

import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.async
Expand Down Expand Up @@ -74,10 +75,12 @@ fun <S : Any, E : Any, O : Any> CoroutineScope.workflow(
// if the event channel was closed.
try {
events.offer(event)
} catch (e: CancellationException) {
// This means that the workflow was cancelled. Senders shouldn't care if the workflow
// accepted the event or not.
} catch (e: ClosedSendChannelException) {
// This may mean the workflow was canceled or finished, or that the workflow closed the
// events channel itself. Either way, senders shouldn't care if the workflow accepted the
// event or not.
// This may mean the workflow finished or that the workflow closed the events channel
// itself. Senders shouldn't care if the workflow accepted the event or not.
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers.Unconfined
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.consume
import kotlinx.coroutines.suspendCancellableCoroutine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ class ReactorAsWorkflowIntegrationTest {
subscribeToState(workflow)
workflow.cancel()

assertEquals(SecondState("hello"), stateSub.poll())
assertFailsWith<CancellationException> { stateSub.poll() }
assertTrue(stateSub.isClosedForReceive)
}
Expand Down Expand Up @@ -166,6 +167,7 @@ class ReactorAsWorkflowIntegrationTest {
assertFalse(resultSub.isCompleted)

secondStateDeferred.complete(SecondState("foo"))
assertEquals(SecondState("foo"), stateSub.poll())
assertTrue(stateSub.isClosedForReceive)
assertEquals("all done", resultSub.getCompleted())
}
Expand Down Expand Up @@ -255,7 +257,8 @@ class ReactorAsWorkflowIntegrationTest {
assertTrue(cancelled)
}

@Test fun `exception is propagated when state subscriber throws from second onNext asynchronously`() {
@Test
fun `exception is propagated when state subscriber throws from second onNext asynchronously`() {
val trigger = CompletableDeferred<Unit>()
reactor = object : MockReactor() {
override suspend fun onReact(
Expand Down Expand Up @@ -372,6 +375,7 @@ class ReactorAsWorkflowIntegrationTest {
assertFalse(resultSub.isCompleted)

workflow.sendEvent("foo")
assertEquals(SecondState("foo"), stateSub.poll())
assertTrue(stateSub.isClosedForReceive)
assertEquals("i heard you like events", resultSub.getCompleted())
}
Expand Down Expand Up @@ -404,6 +408,7 @@ class ReactorAsWorkflowIntegrationTest {
}
}
start("foo")
assertEquals(FirstState("foo"), stateSub.poll())
trigger.complete(Unit)

runBlocking {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import com.squareup.workflow.Worker.Emitter
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.channels.BroadcastChannel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.consumeEach
Expand Down Expand Up @@ -328,7 +327,7 @@ inline fun <reified T> ReceiveChannel<T>.asWorker(
* This **SHOULD NOT** be used in production code.
*/
@VeryExperimentalWorkflow
@FlowPreview
@ExperimentalCoroutinesApi
inline fun <reified T> Flow<T>.asWorker(
key: String = ""
): Worker<T> = create(key) { emitAll(this@asWorker) }
Expand Down Expand Up @@ -369,7 +368,7 @@ suspend inline fun <T> Emitter<T>.emitAll(
* This **SHOULD NOT** be used in production code.
*/
@VeryExperimentalWorkflow
@FlowPreview
@ExperimentalCoroutinesApi
suspend inline fun <T> Emitter<T>.emitAll(flow: Flow<T>) {
flow.collect { emitOutput(it) }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.ObsoleteCoroutinesApi
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.consume
Expand Down Expand Up @@ -160,7 +158,7 @@ interface WorkflowHost<out OutputT : Any, out RenderingT> {
* use [WorkflowHost.Factory] to create a [WorkflowHost], or one of the stream operators for your
* favorite Rx library to map a stream of [InputT]s into [Update]s.
*/
@UseExperimental(InternalCoroutinesApi::class, ObsoleteCoroutinesApi::class)
@UseExperimental(ExperimentalCoroutinesApi::class)
suspend fun <InputT, StateT, OutputT : Any, RenderingT> runWorkflowTree(
workflow: StatefulWorkflow<InputT, StateT, OutputT, RenderingT>,
inputs: () -> ReceiveChannel<InputT>,
Expand Down

0 comments on commit 8baace0

Please sign in to comment.