Skip to content

Commit

Permalink
wip: unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
zach-klippenstein committed Jul 8, 2019
1 parent b590f56 commit dab29a0
Show file tree
Hide file tree
Showing 4 changed files with 255 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.Job
import kotlinx.coroutines.ObsoleteCoroutinesApi
import kotlinx.coroutines.async
import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.coroutineScope
Expand Down Expand Up @@ -149,19 +149,14 @@ private suspend fun runTerminalWorkflow(
}
}

// Stop the runtime and return the exit code as soon as the workflow emits one.
val workflowJob = coroutineContext[Job]!!
return@runWorkflowIn async {
val exitCode = outputs.first()
// If we don't cancel the workflow runtime explicitly, coroutineScope will hang waiting for
// it to finish.
workflowJob.cancel(
CancellationException("TerminalWorkflowRunner completed with exit code $exitCode")
)
// TODO This won't work, this job will already have been cancelled.
return@async exitCode
}
return@runWorkflowIn async { outputs.first() }
}

return@coroutineScope result.await()
val exitCode = result.await()
// If we don't cancel the workflow runtime explicitly, coroutineScope will hang waiting for it to
// finish.
coroutineContext.cancelChildren(
CancellationException("TerminalWorkflowRunner completed with exit code $exitCode")
)
return@coroutineScope exitCode
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
*/
package com.squareup.workflow

import com.squareup.workflow.internal.runWorkflowLoop
import com.squareup.workflow.internal.RealWorkflowLoop
import com.squareup.workflow.internal.WorkflowLoop
import com.squareup.workflow.internal.unwrapCancellationCause
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart.ATOMIC
Expand Down Expand Up @@ -60,6 +61,7 @@ fun <InputT, OutputT : Any, RenderingT, ResultT> runWorkflowIn(
) -> ResultT
): ResultT = runWorkflowImpl(
scope,
RealWorkflowLoop,
workflow.asStatefulWorkflow(),
inputs,
initialSnapshot = initialSnapshot,
Expand All @@ -83,6 +85,7 @@ fun <InputT, StateT, OutputT : Any, RenderingT, ResultT> runWorkflowForTestFromS
) -> ResultT
): ResultT = runWorkflowImpl(
scope,
RealWorkflowLoop,
workflow,
inputs,
initialState = initialState,
Expand All @@ -94,8 +97,9 @@ fun <InputT, StateT, OutputT : Any, RenderingT, ResultT> runWorkflowForTestFromS
* TODO write documentation
*/
@UseExperimental(ExperimentalCoroutinesApi::class, FlowPreview::class)
private fun <InputT, StateT, OutputT : Any, RenderingT, ResultT> runWorkflowImpl(
internal fun <InputT, StateT, OutputT : Any, RenderingT, ResultT> runWorkflowImpl(
scope: CoroutineScope,
workflowLoop: WorkflowLoop,
workflow: StatefulWorkflow<InputT, StateT, OutputT, RenderingT>,
inputs: Flow<InputT>,
initialSnapshot: Snapshot?,
Expand All @@ -121,7 +125,7 @@ private fun <InputT, StateT, OutputT : Any, RenderingT, ResultT> runWorkflowImpl

workflowScope.launch(start = ATOMIC) {
// Run the workflow processing loop forever, or until it fails or is cancelled.
runWorkflowLoop(
workflowLoop.runWorkflowLoop(
workflow,
inputs,
initialSnapshot = initialSnapshot,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,78 +27,94 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.produceIn
import kotlinx.coroutines.selects.select

/**
* Loops forever, or until the coroutine is cancelled, processing the workflow tree and emitting
* updates by calling [onRendering] and [onOutput].
*
* This function is the lowest-level entry point into the runtime. Don't call this directly, instead
* call [com.squareup.workflow.runWorkflowIn].
*/
@UseExperimental(FlowPreview::class, ExperimentalCoroutinesApi::class)
internal suspend fun <InputT, StateT, OutputT : Any, RenderingT> runWorkflowLoop(
workflow: StatefulWorkflow<InputT, StateT, OutputT, RenderingT>,
inputs: Flow<InputT>,
initialSnapshot: Snapshot?,
initialState: StateT? = null,
onRendering: suspend (RenderingAndSnapshot<RenderingT>) -> Unit,
onOutput: suspend (OutputT) -> Unit
): Nothing = coroutineScope {
val inputsChannel = inputs.produceIn(this)
inputsChannel.consume {
var output: OutputT? = null
var input: InputT = inputsChannel.receive()
var inputsClosed = false
val rootNode = WorkflowNode(
id = workflow.id(),
workflow = workflow,
initialInput = input,
snapshot = initialSnapshot,
baseContext = coroutineContext,
initialState = initialState
)
@UseExperimental(ExperimentalCoroutinesApi::class)
internal interface WorkflowLoop {

/**
* Loops forever, or until the coroutine is cancelled, processing the workflow tree and emitting
* updates by calling [onRendering] and [onOutput].
*
* This function is the lowest-level entry point into the runtime. Don't call this directly, instead
* call [com.squareup.workflow.runWorkflowIn].
*/
suspend fun <InputT, StateT, OutputT : Any, RenderingT> runWorkflowLoop(
workflow: StatefulWorkflow<InputT, StateT, OutputT, RenderingT>,
inputs: Flow<InputT>,
initialSnapshot: Snapshot?,
initialState: StateT? = null,
onRendering: suspend (RenderingAndSnapshot<RenderingT>) -> Unit,
onOutput: suspend (OutputT) -> Unit
): Nothing
}

try {
while (true) {
coroutineContext.ensureActive()
internal object RealWorkflowLoop : WorkflowLoop {

val rendering = rootNode.render(workflow, input)
val snapshot = rootNode.snapshot(workflow)
@UseExperimental(FlowPreview::class, ExperimentalCoroutinesApi::class)
override suspend fun <InputT, StateT, OutputT : Any, RenderingT> runWorkflowLoop(
workflow: StatefulWorkflow<InputT, StateT, OutputT, RenderingT>,
inputs: Flow<InputT>,
initialSnapshot: Snapshot?,
initialState: StateT?,
onRendering: suspend (RenderingAndSnapshot<RenderingT>) -> Unit,
onOutput: suspend (OutputT) -> Unit
): Nothing = coroutineScope {
val inputsChannel = inputs.produceIn(this)
inputsChannel.consume {
var output: OutputT? = null
var input: InputT = inputsChannel.receive()
var inputsClosed = false
val rootNode = WorkflowNode(
id = workflow.id(),
workflow = workflow,
initialInput = input,
snapshot = initialSnapshot,
baseContext = coroutineContext,
initialState = initialState
)

onRendering(RenderingAndSnapshot(rendering, snapshot))
output?.let { onOutput(it) }
try {
while (true) {
coroutineContext.ensureActive()

// Tick _might_ return an output, but if it returns null, it means the state or a child
// probably changed, so we should re-render/snapshot and emit again.
output = select {
// Stop trying to read from the inputs channel after it's closed.
if (!inputsClosed) {
@Suppress("EXPERIMENTAL_API_USAGE")
inputsChannel.onReceiveOrNull { newInput ->
if (newInput == null) {
inputsClosed = true
} else {
input = newInput
val rendering = rootNode.render(workflow, input)
val snapshot = rootNode.snapshot(workflow)

onRendering(RenderingAndSnapshot(rendering, snapshot))
output?.let { onOutput(it) }

// Tick _might_ return an output, but if it returns null, it means the state or a child
// probably changed, so we should re-render/snapshot and emit again.
output = select {
// Stop trying to read from the inputs channel after it's closed.
if (!inputsClosed) {
@Suppress("EXPERIMENTAL_API_USAGE")
inputsChannel.onReceiveOrNull { newInput ->
if (newInput == null) {
inputsClosed = true
} else {
input = newInput
}
// No output. Returning from the select will go to the top of the loop to do another
// render pass.
return@onReceiveOrNull null
}
// No output. Returning from the select will go to the top of the loop to do another
// render pass.
return@onReceiveOrNull null
}
}

// Tick the workflow tree.
rootNode.tick(this) { it }
// Tick the workflow tree.
rootNode.tick(this) { it }
}
}
// Compiler gets confused, and thinks both that this throw is unreachable, and without the
// throw that the infinite while loop will exit normally and thus need a return statement.
@Suppress("UNREACHABLE_CODE", "ThrowableNotThrown")
throw AssertionError()
} finally {
// There's a potential race condition if the producer coroutine is cancelled before it has a
// chance to enter the try block, since we can't use CoroutineStart.ATOMIC. However, until we
// actually see this cause problems, I'm not too worried about it.
// See https://github.com/Kotlin/kotlinx.coroutines/issues/845
rootNode.cancel()
}
// Compiler gets confused, and thinks both that this throw is unreachable, and without the
// throw that the infinite while loop will exit normally and thus need a return statement.
@Suppress("UNREACHABLE_CODE", "ThrowableNotThrown")
throw AssertionError()
} finally {
// There's a potential race condition if the producer coroutine is cancelled before it has a
// chance to enter the try block, since we can't use CoroutineStart.ATOMIC. However, until we
// actually see this cause problems, I'm not too worried about it.
// See https://github.com/Kotlin/kotlinx.coroutines/issues/845
rootNode.cancel()
}
}
}
Loading

0 comments on commit dab29a0

Please sign in to comment.