Skip to content

Commit

Permalink
wip: unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
zach-klippenstein committed Jul 7, 2019
1 parent 571fe8e commit 9625cf3
Show file tree
Hide file tree
Showing 3 changed files with 246 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
*/
package com.squareup.workflow

import com.squareup.workflow.internal.runWorkflowLoop
import com.squareup.workflow.internal.RealWorkflowLoop
import com.squareup.workflow.internal.WorkflowLoop
import com.squareup.workflow.internal.unwrapCancellationCause
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart.ATOMIC
Expand Down Expand Up @@ -60,6 +61,7 @@ fun <InputT, OutputT : Any, RenderingT, ResultT> runWorkflowIn(
) -> ResultT
): ResultT = runWorkflowImpl(
scope,
RealWorkflowLoop,
workflow.asStatefulWorkflow(),
inputs,
initialSnapshot = initialSnapshot,
Expand All @@ -83,6 +85,7 @@ fun <InputT, StateT, OutputT : Any, RenderingT, ResultT> runWorkflowForTestFromS
) -> ResultT
): ResultT = runWorkflowImpl(
scope,
RealWorkflowLoop,
workflow,
inputs,
initialState = initialState,
Expand All @@ -94,8 +97,9 @@ fun <InputT, StateT, OutputT : Any, RenderingT, ResultT> runWorkflowForTestFromS
* TODO write documentation
*/
@UseExperimental(ExperimentalCoroutinesApi::class, FlowPreview::class)
private fun <InputT, StateT, OutputT : Any, RenderingT, ResultT> runWorkflowImpl(
internal fun <InputT, StateT, OutputT : Any, RenderingT, ResultT> runWorkflowImpl(
scope: CoroutineScope,
workflowLoop: WorkflowLoop,
workflow: StatefulWorkflow<InputT, StateT, OutputT, RenderingT>,
inputs: Flow<InputT>,
initialSnapshot: Snapshot?,
Expand All @@ -121,7 +125,7 @@ private fun <InputT, StateT, OutputT : Any, RenderingT, ResultT> runWorkflowImpl

workflowScope.launch(start = ATOMIC) {
// Run the workflow processing loop forever, or until it fails or is cancelled.
runWorkflowLoop(
workflowLoop.runWorkflowLoop(
workflow,
inputs,
initialSnapshot = initialSnapshot,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,78 +27,94 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.produceIn
import kotlinx.coroutines.selects.select

/**
* Loops forever, or until the coroutine is cancelled, processing the workflow tree and emitting
* updates by calling [onRendering] and [onOutput].
*
* This function is the lowest-level entry point into the runtime. Don't call this directly, instead
* call [com.squareup.workflow.runWorkflowIn].
*/
@UseExperimental(FlowPreview::class, ExperimentalCoroutinesApi::class)
internal suspend fun <InputT, StateT, OutputT : Any, RenderingT> runWorkflowLoop(
workflow: StatefulWorkflow<InputT, StateT, OutputT, RenderingT>,
inputs: Flow<InputT>,
initialSnapshot: Snapshot?,
initialState: StateT? = null,
onRendering: suspend (RenderingAndSnapshot<RenderingT>) -> Unit,
onOutput: suspend (OutputT) -> Unit
): Nothing = coroutineScope {
val inputsChannel = inputs.produceIn(this)
inputsChannel.consume {
var output: OutputT? = null
var input: InputT = inputsChannel.receive()
var inputsClosed = false
val rootNode = WorkflowNode(
id = workflow.id(),
workflow = workflow,
initialInput = input,
snapshot = initialSnapshot,
baseContext = coroutineContext,
initialState = initialState
)
@UseExperimental(ExperimentalCoroutinesApi::class)
internal interface WorkflowLoop {

/**
* Loops forever, or until the coroutine is cancelled, processing the workflow tree and emitting
* updates by calling [onRendering] and [onOutput].
*
* This function is the lowest-level entry point into the runtime. Don't call this directly, instead
* call [com.squareup.workflow.runWorkflowIn].
*/
suspend fun <InputT, StateT, OutputT : Any, RenderingT> runWorkflowLoop(
workflow: StatefulWorkflow<InputT, StateT, OutputT, RenderingT>,
inputs: Flow<InputT>,
initialSnapshot: Snapshot?,
initialState: StateT? = null,
onRendering: suspend (RenderingAndSnapshot<RenderingT>) -> Unit,
onOutput: suspend (OutputT) -> Unit
): Nothing
}

try {
while (true) {
coroutineContext.ensureActive()
internal object RealWorkflowLoop : WorkflowLoop {

val rendering = rootNode.render(workflow, input)
val snapshot = rootNode.snapshot(workflow)
@UseExperimental(FlowPreview::class, ExperimentalCoroutinesApi::class)
override suspend fun <InputT, StateT, OutputT : Any, RenderingT> runWorkflowLoop(
workflow: StatefulWorkflow<InputT, StateT, OutputT, RenderingT>,
inputs: Flow<InputT>,
initialSnapshot: Snapshot?,
initialState: StateT?,
onRendering: suspend (RenderingAndSnapshot<RenderingT>) -> Unit,
onOutput: suspend (OutputT) -> Unit
): Nothing = coroutineScope {
val inputsChannel = inputs.produceIn(this)
inputsChannel.consume {
var output: OutputT? = null
var input: InputT = inputsChannel.receive()
var inputsClosed = false
val rootNode = WorkflowNode(
id = workflow.id(),
workflow = workflow,
initialInput = input,
snapshot = initialSnapshot,
baseContext = coroutineContext,
initialState = initialState
)

onRendering(RenderingAndSnapshot(rendering, snapshot))
output?.let { onOutput(it) }
try {
while (true) {
coroutineContext.ensureActive()

// Tick _might_ return an output, but if it returns null, it means the state or a child
// probably changed, so we should re-render/snapshot and emit again.
output = select {
// Stop trying to read from the inputs channel after it's closed.
if (!inputsClosed) {
@Suppress("EXPERIMENTAL_API_USAGE")
inputsChannel.onReceiveOrNull { newInput ->
if (newInput == null) {
inputsClosed = true
} else {
input = newInput
val rendering = rootNode.render(workflow, input)
val snapshot = rootNode.snapshot(workflow)

onRendering(RenderingAndSnapshot(rendering, snapshot))
output?.let { onOutput(it) }

// Tick _might_ return an output, but if it returns null, it means the state or a child
// probably changed, so we should re-render/snapshot and emit again.
output = select {
// Stop trying to read from the inputs channel after it's closed.
if (!inputsClosed) {
@Suppress("EXPERIMENTAL_API_USAGE")
inputsChannel.onReceiveOrNull { newInput ->
if (newInput == null) {
inputsClosed = true
} else {
input = newInput
}
// No output. Returning from the select will go to the top of the loop to do another
// render pass.
return@onReceiveOrNull null
}
// No output. Returning from the select will go to the top of the loop to do another
// render pass.
return@onReceiveOrNull null
}
}

// Tick the workflow tree.
rootNode.tick(this) { it }
// Tick the workflow tree.
rootNode.tick(this) { it }
}
}
// Compiler gets confused, and thinks both that this throw is unreachable, and without the
// throw that the infinite while loop will exit normally and thus need a return statement.
@Suppress("UNREACHABLE_CODE", "ThrowableNotThrown")
throw AssertionError()
} finally {
// There's a potential race condition if the producer coroutine is cancelled before it has a
// chance to enter the try block, since we can't use CoroutineStart.ATOMIC. However, until we
// actually see this cause problems, I'm not too worried about it.
// See https://github.com/Kotlin/kotlinx.coroutines/issues/845
rootNode.cancel()
}
// Compiler gets confused, and thinks both that this throw is unreachable, and without the
// throw that the infinite while loop will exit normally and thus need a return statement.
@Suppress("UNREACHABLE_CODE", "ThrowableNotThrown")
throw AssertionError()
} finally {
// There's a potential race condition if the producer coroutine is cancelled before it has a
// chance to enter the try block, since we can't use CoroutineStart.ATOMIC. However, until we
// actually see this cause problems, I'm not too worried about it.
// See https://github.com/Kotlin/kotlinx.coroutines/issues/845
rootNode.cancel()
}
}
}
Loading

0 comments on commit 9625cf3

Please sign in to comment.