Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade coroutines dep to 1.3.0-M1. #409

Merged
merged 2 commits into from
Jun 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kotlin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ buildscript {
'intellijAnnotations': '13.0',
'junit': '4.12',
'kotlin': '1.3.21',
'kotlinCoroutines': '1.2.1',
'kotlinCoroutines': '1.3.0-M2',
'ktlintPlugin': '5.1.0',
'mavenPublishPlugin': '0.8.0',
'mockito': '2.7.5',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.squareup.workflow.legacy

import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.async
Expand Down Expand Up @@ -74,10 +75,12 @@ fun <S : Any, E : Any, O : Any> CoroutineScope.workflow(
// if the event channel was closed.
try {
events.offer(event)
} catch (e: CancellationException) {
// This means that the workflow was cancelled. Senders shouldn't care if the workflow
// accepted the event or not.
} catch (e: ClosedSendChannelException) {
// This may mean the workflow was canceled or finished, or that the workflow closed the
// events channel itself. Either way, senders shouldn't care if the workflow accepted the
// event or not.
// This may mean the workflow finished or that the workflow closed the events channel
// itself. Senders shouldn't care if the workflow accepted the event or not.
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers.Unconfined
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.consume
import kotlinx.coroutines.suspendCancellableCoroutine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ class ReactorAsWorkflowIntegrationTest {
subscribeToState(workflow)
workflow.cancel()

assertEquals(SecondState("hello"), stateSub.poll())
assertFailsWith<CancellationException> { stateSub.poll() }
assertTrue(stateSub.isClosedForReceive)
}
Expand Down Expand Up @@ -166,6 +167,7 @@ class ReactorAsWorkflowIntegrationTest {
assertFalse(resultSub.isCompleted)

secondStateDeferred.complete(SecondState("foo"))
assertEquals(SecondState("foo"), stateSub.poll())
assertTrue(stateSub.isClosedForReceive)
assertEquals("all done", resultSub.getCompleted())
}
Expand Down Expand Up @@ -255,7 +257,8 @@ class ReactorAsWorkflowIntegrationTest {
assertTrue(cancelled)
}

@Test fun `exception is propagated when state subscriber throws from second onNext asynchronously`() {
@Test
fun `exception is propagated when state subscriber throws from second onNext asynchronously`() {
val trigger = CompletableDeferred<Unit>()
reactor = object : MockReactor() {
override suspend fun onReact(
Expand Down Expand Up @@ -372,6 +375,7 @@ class ReactorAsWorkflowIntegrationTest {
assertFalse(resultSub.isCompleted)

workflow.sendEvent("foo")
assertEquals(SecondState("foo"), stateSub.poll())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this line for added coverage, or did the poll() call actually fix something?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conflated channels now hang onto their last value even after they're closed, so we have to pull it out first now.

See Kotlin/kotlinx.coroutines#332

assertTrue(stateSub.isClosedForReceive)
assertEquals("i heard you like events", resultSub.getCompleted())
}
Expand Down Expand Up @@ -404,6 +408,7 @@ class ReactorAsWorkflowIntegrationTest {
}
}
start("foo")
assertEquals(FirstState("foo"), stateSub.poll())
trigger.complete(Unit)

runBlocking {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import com.squareup.workflow.Worker.Emitter
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.channels.BroadcastChannel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.consumeEach
Expand Down Expand Up @@ -329,7 +328,7 @@ inline fun <reified T> ReceiveChannel<T>.asWorker(
* This **SHOULD NOT** be used in production code.
*/
@VeryExperimentalWorkflow
@FlowPreview
@ExperimentalCoroutinesApi
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🍿

inline fun <reified T> Flow<T>.asWorker(
key: String = ""
): Worker<T> = create(key) { emitAll(this@asWorker) }
Expand Down Expand Up @@ -370,7 +369,7 @@ suspend inline fun <T> Emitter<T>.emitAll(
* This **SHOULD NOT** be used in production code.
*/
@VeryExperimentalWorkflow
@FlowPreview
@ExperimentalCoroutinesApi
suspend inline fun <T> Emitter<T>.emitAll(flow: Flow<T>) {
flow.collect { emitOutput(it) }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.ObsoleteCoroutinesApi
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.consume
Expand Down Expand Up @@ -160,7 +158,7 @@ interface WorkflowHost<out OutputT : Any, out RenderingT> {
* use [WorkflowHost.Factory] to create a [WorkflowHost], or one of the stream operators for your
* favorite Rx library to map a stream of [InputT]s into [Update]s.
*/
@UseExperimental(InternalCoroutinesApi::class, ObsoleteCoroutinesApi::class)
@UseExperimental(ExperimentalCoroutinesApi::class)
suspend fun <InputT, StateT, OutputT : Any, RenderingT> runWorkflowTree(
workflow: StatefulWorkflow<InputT, StateT, OutputT, RenderingT>,
inputs: () -> ReceiveChannel<InputT>,
Expand Down