From 8baace074aada05e4139f9260bd1da051d4ae11e Mon Sep 17 00:00:00 2001 From: Zachary Klippenstein Date: Fri, 7 Jun 2019 09:35:23 -0700 Subject: [PATCH] Upgrade coroutines dep to 1.3.0-M1. - Breakage in `CoroutineWorkflow` was caused by https://github.com/Kotlin/kotlinx.coroutines/pull/1158. - Breakage in `ReactorAsWorkflowIntegrationTest` was probably caused by https://github.com/Kotlin/kotlinx.coroutines/pull/1239. --- kotlin/build.gradle | 2 +- .../com/squareup/workflow/legacy/CoroutineWorkflow.kt | 9 ++++++--- .../squareup/workflow/legacy/CoroutineWorkflowTest.kt | 1 - .../workflow/legacy/ReactorAsWorkflowIntegrationTest.kt | 7 ++++++- .../src/main/java/com/squareup/workflow/Worker.kt | 5 ++--- .../src/main/java/com/squareup/workflow/WorkflowHost.kt | 4 +--- 6 files changed, 16 insertions(+), 12 deletions(-) diff --git a/kotlin/build.gradle b/kotlin/build.gradle index 893882e44..f2967c622 100644 --- a/kotlin/build.gradle +++ b/kotlin/build.gradle @@ -36,7 +36,7 @@ buildscript { 'intellijAnnotations': '13.0', 'junit': '4.12', 'kotlin': '1.3.21', - 'kotlinCoroutines': '1.2.1', + 'kotlinCoroutines': '1.3.0-M1', 'ktlintPlugin': '5.1.0', 'mavenPublishPlugin': '0.6.0', 'mockito': '2.7.5', diff --git a/kotlin/legacy/legacy-workflow-core/src/main/java/com/squareup/workflow/legacy/CoroutineWorkflow.kt b/kotlin/legacy/legacy-workflow-core/src/main/java/com/squareup/workflow/legacy/CoroutineWorkflow.kt index 8f7ca0cd6..ff06c3a0e 100644 --- a/kotlin/legacy/legacy-workflow-core/src/main/java/com/squareup/workflow/legacy/CoroutineWorkflow.kt +++ b/kotlin/legacy/legacy-workflow-core/src/main/java/com/squareup/workflow/legacy/CoroutineWorkflow.kt @@ -17,6 +17,7 @@ package com.squareup.workflow.legacy +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Deferred import kotlinx.coroutines.async @@ -74,10 +75,12 @@ fun CoroutineScope.workflow( // if the event channel was closed. try { events.offer(event) + } catch (e: CancellationException) { + // This means that the workflow was cancelled. Senders shouldn't care if the workflow + // accepted the event or not. } catch (e: ClosedSendChannelException) { - // This may mean the workflow was canceled or finished, or that the workflow closed the - // events channel itself. Either way, senders shouldn't care if the workflow accepted the - // event or not. + // This may mean the workflow finished or that the workflow closed the events channel + // itself. Senders shouldn't care if the workflow accepted the event or not. } } diff --git a/kotlin/legacy/legacy-workflow-core/src/test/java/com/squareup/workflow/legacy/CoroutineWorkflowTest.kt b/kotlin/legacy/legacy-workflow-core/src/test/java/com/squareup/workflow/legacy/CoroutineWorkflowTest.kt index b5f2abcbd..f7a0db65c 100644 --- a/kotlin/legacy/legacy-workflow-core/src/test/java/com/squareup/workflow/legacy/CoroutineWorkflowTest.kt +++ b/kotlin/legacy/legacy-workflow-core/src/test/java/com/squareup/workflow/legacy/CoroutineWorkflowTest.kt @@ -21,7 +21,6 @@ import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineName import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers.Unconfined -import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.channels.consume import kotlinx.coroutines.suspendCancellableCoroutine diff --git a/kotlin/legacy/legacy-workflow-core/src/test/java/com/squareup/workflow/legacy/ReactorAsWorkflowIntegrationTest.kt b/kotlin/legacy/legacy-workflow-core/src/test/java/com/squareup/workflow/legacy/ReactorAsWorkflowIntegrationTest.kt index 383421dd2..524fcb659 100644 --- a/kotlin/legacy/legacy-workflow-core/src/test/java/com/squareup/workflow/legacy/ReactorAsWorkflowIntegrationTest.kt +++ b/kotlin/legacy/legacy-workflow-core/src/test/java/com/squareup/workflow/legacy/ReactorAsWorkflowIntegrationTest.kt @@ -103,6 +103,7 @@ class ReactorAsWorkflowIntegrationTest { subscribeToState(workflow) workflow.cancel() + assertEquals(SecondState("hello"), stateSub.poll()) assertFailsWith { stateSub.poll() } assertTrue(stateSub.isClosedForReceive) } @@ -166,6 +167,7 @@ class ReactorAsWorkflowIntegrationTest { assertFalse(resultSub.isCompleted) secondStateDeferred.complete(SecondState("foo")) + assertEquals(SecondState("foo"), stateSub.poll()) assertTrue(stateSub.isClosedForReceive) assertEquals("all done", resultSub.getCompleted()) } @@ -255,7 +257,8 @@ class ReactorAsWorkflowIntegrationTest { assertTrue(cancelled) } - @Test fun `exception is propagated when state subscriber throws from second onNext asynchronously`() { + @Test + fun `exception is propagated when state subscriber throws from second onNext asynchronously`() { val trigger = CompletableDeferred() reactor = object : MockReactor() { override suspend fun onReact( @@ -372,6 +375,7 @@ class ReactorAsWorkflowIntegrationTest { assertFalse(resultSub.isCompleted) workflow.sendEvent("foo") + assertEquals(SecondState("foo"), stateSub.poll()) assertTrue(stateSub.isClosedForReceive) assertEquals("i heard you like events", resultSub.getCompleted()) } @@ -404,6 +408,7 @@ class ReactorAsWorkflowIntegrationTest { } } start("foo") + assertEquals(FirstState("foo"), stateSub.poll()) trigger.complete(Unit) runBlocking { diff --git a/kotlin/workflow-core/src/main/java/com/squareup/workflow/Worker.kt b/kotlin/workflow-core/src/main/java/com/squareup/workflow/Worker.kt index b01ce4e72..b2fe9c445 100644 --- a/kotlin/workflow-core/src/main/java/com/squareup/workflow/Worker.kt +++ b/kotlin/workflow-core/src/main/java/com/squareup/workflow/Worker.kt @@ -23,7 +23,6 @@ import com.squareup.workflow.Worker.Emitter import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Deferred import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.channels.BroadcastChannel import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.channels.consumeEach @@ -328,7 +327,7 @@ inline fun ReceiveChannel.asWorker( * This **SHOULD NOT** be used in production code. */ @VeryExperimentalWorkflow -@FlowPreview +@ExperimentalCoroutinesApi inline fun Flow.asWorker( key: String = "" ): Worker = create(key) { emitAll(this@asWorker) } @@ -369,7 +368,7 @@ suspend inline fun Emitter.emitAll( * This **SHOULD NOT** be used in production code. */ @VeryExperimentalWorkflow -@FlowPreview +@ExperimentalCoroutinesApi suspend inline fun Emitter.emitAll(flow: Flow) { flow.collect { emitOutput(it) } } diff --git a/kotlin/workflow-runtime/src/main/java/com/squareup/workflow/WorkflowHost.kt b/kotlin/workflow-runtime/src/main/java/com/squareup/workflow/WorkflowHost.kt index 6278efe02..c2a5e527c 100644 --- a/kotlin/workflow-runtime/src/main/java/com/squareup/workflow/WorkflowHost.kt +++ b/kotlin/workflow-runtime/src/main/java/com/squareup/workflow/WorkflowHost.kt @@ -23,8 +23,6 @@ import kotlinx.coroutines.CoroutineName import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.GlobalScope -import kotlinx.coroutines.InternalCoroutinesApi -import kotlinx.coroutines.ObsoleteCoroutinesApi import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.channels.consume @@ -160,7 +158,7 @@ interface WorkflowHost { * use [WorkflowHost.Factory] to create a [WorkflowHost], or one of the stream operators for your * favorite Rx library to map a stream of [InputT]s into [Update]s. */ -@UseExperimental(InternalCoroutinesApi::class, ObsoleteCoroutinesApi::class) +@UseExperimental(ExperimentalCoroutinesApi::class) suspend fun runWorkflowTree( workflow: StatefulWorkflow, inputs: () -> ReceiveChannel,