Skip to content

Commit

Permalink
Replace WorkflowHost with the runWorkflow function.
Browse files Browse the repository at this point in the history
Closes #439.
  • Loading branch information
zach-klippenstein committed Jul 9, 2019
1 parent 8786269 commit 950136c
Show file tree
Hide file tree
Showing 12 changed files with 910 additions and 851 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,18 @@ import com.googlecode.lanterna.screen.TerminalScreen
import com.googlecode.lanterna.terminal.DefaultTerminalFactory
import com.squareup.workflow.Worker
import com.squareup.workflow.Workflow
import com.squareup.workflow.WorkflowHost
import com.squareup.workflow.asWorker
import com.squareup.workflow.launchWorkflowIn
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.ObsoleteCoroutinesApi
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.BroadcastChannel
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.first
Expand All @@ -42,21 +42,17 @@ import kotlinx.coroutines.launch
import kotlinx.coroutines.plus
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.selects.selectUnbiased
import kotlin.coroutines.EmptyCoroutineContext

/**
* Hosts [Workflow]s that:
* - gets information about the terminal configuration as input
* - renders the text to display on the terminal
* - finishes by emitting an exit code that should be passed to [kotlin.system.exitProcess].
*
* @param hostFactory Used to create the actual [WorkflowHost] that hosts workflows. Any dispatcher
* configured on the host will be ignored, to ensure that key events stay in sync with renderings.
* @param ioDispatcher Defaults to [Dispatchers.IO] and is used to listen for key events using
* blocking APIs.
*/
class TerminalWorkflowRunner(
private val hostFactory: WorkflowHost.Factory = WorkflowHost.Factory(EmptyCoroutineContext),
private val ioDispatcher: CoroutineDispatcher = Dispatchers.IO
) {

Expand All @@ -74,8 +70,6 @@ class TerminalWorkflowRunner(
// when invoking from coroutines. This entire function is blocking however, so we don't care.
@Suppress("BlockingMethodInNonBlockingContext")
fun run(workflow: TerminalWorkflow): ExitCode = runBlocking {
val configs = BroadcastChannel<TerminalInput>(CONFLATED)
val host = hostFactory.run(workflow, configs.asFlow(), context = coroutineContext)
val keyStrokesChannel = screen.listenForKeyStrokesOn(this + ioDispatcher)
val keyStrokesWorker = keyStrokesChannel.asWorker()
val resizes = screen.terminal.listenForResizesOn(this)
Expand All @@ -86,7 +80,7 @@ class TerminalWorkflowRunner(
try {
screen.startScreen()
try {
runTerminalWorkflow(screen, configs, host, keyStrokesWorker, resizes)
return@runBlocking runTerminalWorkflow(workflow, screen, keyStrokesWorker, resizes)
} finally {
screen.stopScreen()
}
Expand All @@ -102,67 +96,67 @@ class TerminalWorkflowRunner(
@Suppress("BlockingMethodInNonBlockingContext")
@UseExperimental(FlowPreview::class, ExperimentalCoroutinesApi::class)
private suspend fun runTerminalWorkflow(
workflow: TerminalWorkflow,
screen: TerminalScreen,
inputs: SendChannel<TerminalInput>,
host: WorkflowHost<ExitCode, TerminalRendering>,
keyStrokes: Worker<KeyStroke>,
resizes: ReceiveChannel<TerminalSize>
): ExitCode = coroutineScope {
var input = TerminalInput(screen.terminalSize.toSize(), keyStrokes)
val inputs = ConflatedBroadcastChannel(input)

// Use the result as the parent Job of the runtime coroutine so it gets cancelled automatically
// if there's an error.
val result =
launchWorkflowIn(this, workflow, inputs.asFlow()) { renderingsAndSnapshots, outputs ->
val renderings = renderingsAndSnapshots.map { it.rendering }
.produceIn(this)

launch {
while (true) {
val rendering = selectUnbiased<TerminalRendering> {
resizes.onReceive {
screen.doResizeIfNecessary()
?.let {
// If the terminal was resized since the last iteration, we need to notify the
// workflow.
input = input.copy(size = it.toSize())
}

// Publish config changes to the workflow.
inputs.send(input)

// Sending that new input invalidated the lastRendering, so we don't want to
// re-iterate until we have a new rendering with a fresh event handler. It also
// triggered a render pass, so we can just retrieve that immediately.
return@onReceive renderings.receive()
}

renderings.onReceive { it }
}

// Need to send an initial input for the workflow to start running.
inputs.offer(input)

// Launch the render loop in a new coroutine, so this coroutine can just sit around and wait
// for the workflow to emit an output.
val renderJob = launch {
val renderings = host.renderingsAndSnapshots.map { it.rendering }
.produceIn(this)

while (true) {
val rendering = selectUnbiased<TerminalRendering> {
resizes.onReceive {
screen.doResizeIfNecessary()
?.let {
// If the terminal was resized since the last iteration, we need to notify the
// workflow.
input = input.copy(size = it.toSize())
screen.clear()
screen.newTextGraphics()
.apply {
foregroundColor = rendering.textColor.toTextColor()
backgroundColor = rendering.backgroundColor.toTextColor()
rendering.text.lineSequence()
.forEachIndexed { index, line ->
putString(TOP_LEFT_CORNER.withRelativeRow(index), line)
}
}

// Publish config changes to the workflow.
inputs.send(input)

// Sending that new input invalidated the lastRendering, so we don't want to
// re-iterate until we have a new rendering with a fresh event handler. It also
// triggered a render pass, so we can just retrieve that immediately.
return@onReceive renderings.receive()
screen.refresh(COMPLETE)
}

renderings.onReceive { it }
}

screen.clear()
screen.newTextGraphics()
.apply {
foregroundColor = rendering.textColor.toTextColor()
backgroundColor = rendering.backgroundColor.toTextColor()
rendering.text.lineSequence()
.forEachIndexed { index, line ->
putString(TOP_LEFT_CORNER.withRelativeRow(index), line)
}
}

screen.refresh(COMPLETE)
return@launchWorkflowIn async { outputs.first() }
}
}

// Start collecting from outputs before starting the workflow host, in case it emits immediately.
val exitCodeDeferred = async { host.outputs.first() }
val workflowJob = host.start()

// Stop the runner and return the exit code as soon as the workflow emits one.
val exitCode = exitCodeDeferred.await()
workflowJob.cancel()
renderJob.cancel()
val exitCode = result.await()
// If we don't cancel the workflow runtime explicitly, coroutineScope will hang waiting for it to
// finish.
coroutineContext.cancelChildren(
CancellationException("TerminalWorkflowRunner completed with exit code $exitCode")
)
return@coroutineScope exitCode
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Copyright 2019 Square Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.squareup.workflow

import com.squareup.workflow.internal.RealWorkflowLoop
import com.squareup.workflow.internal.WorkflowLoop
import com.squareup.workflow.internal.unwrapCancellationCause
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.BroadcastChannel
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.plus
import org.jetbrains.annotations.TestOnly

/**
* Don't use this typealias for the public API, better to just use the function directly so it's
* more obvious how to use it.
*/
@UseExperimental(ExperimentalCoroutinesApi::class)
internal typealias Configurator <O, R, T> = CoroutineScope.(
renderingsAndSnapshots: Flow<RenderingAndSnapshot<R>>,
outputs: Flow<O>
) -> T

/**
* Launches the [workflow] in a new coroutine in [scope]. The workflow tree is seeded with
* [initialSnapshot] and the first value emitted by [inputs]. Subsequent values emitted from
* [inputs] will be used to re-render the workflow.
*
* ## Initialization
*
* Before starting the actual workflow runtime, this function will invoke [beforeStart] and pass
* it the [Flow]s of renderings, snapshots, and outputs, as well as a [CoroutineScope] that the
* runtime will be hosted in. The workflow runtime will not be started until after this function
* returns. This is to allow the output flow to start being collected before any outputs can
* actually be emitted. Collectors that start _after_ [beforeStart] returns may not receive outputs
* that are emitted very quickly. The value returned by [beforeStart] will be returned from this
* function after the runtime is launched. The [CoroutineScope] passed to [beforeStart] can be used
* to do more than just cancel the runtime – it can also be used to start coroutines that will run
* until the workflow is cancelled, namely to collect the rendering and output [Flow]s.
*
* ## Scoping
*
* The workflow runtime makes use of
* [structured concurrency](https://medium.com/@elizarov/structured-concurrency-722d765aa952).
* The runtime is started in a specific [CoroutineScope], which defines the context for the entire
* workflow tree – most importantly, the [Job] that governs the runtime's lifetime and exception
* reporting path, and the [CoroutineDispatcher][kotlinx.coroutines.CoroutineDispatcher] that
* decides what on what threads to run workflow code.
*
* This function creates a child [Job] in [scope] and uses it as the parent for the workflow
* runtime, and as the job passed to the [beforeStart] function. This means that if [beforeStart]
* calls `CoroutineScope.cancel`, it will cancel the workflow runtime, but will not cancel the
* [scope] passed into this function.
*
* @param scope The [CoroutineScope] in which to launch the workflow runtime. Any exceptions thrown
* in any workflows will be reported to this scope, and cancelling this scope will cancel the
* workflow runtime.
* @param workflow The root workflow to start.
* @param inputs Stream of input values for the root workflow. The first value emitted is passed to
* the root workflow's [StatefulWorkflow.initialState], and subsequent emissions are passed as
* input updates to the root workflow.
* @param initialSnapshot If not null, used to restore the workflow.
* @param beforeStart Called exactly once with the flows for renderings/snapshots and outputs.
* Note that the receiver scope will already be cancelled, and the flows will complete immediately
* if this function was invoked from a cancelled [scope].
*
* @return The value returned by [beforeStart].
*/
@UseExperimental(ExperimentalCoroutinesApi::class, FlowPreview::class)
fun <InputT, OutputT : Any, RenderingT, ResultT> launchWorkflowIn(
scope: CoroutineScope,
workflow: Workflow<InputT, OutputT, RenderingT>,
inputs: Flow<InputT>,
initialSnapshot: Snapshot? = null,
beforeStart: CoroutineScope.(
renderingsAndSnapshots: Flow<RenderingAndSnapshot<RenderingT>>,
outputs: Flow<OutputT>
) -> ResultT
): ResultT = launchWorkflowImpl(
scope,
RealWorkflowLoop,
workflow.asStatefulWorkflow(),
inputs,
initialSnapshot = initialSnapshot,
initialState = null,
beforeStart = beforeStart
)

/**
* Launches the [workflow] in a new coroutine in [scope]. The workflow tree is seeded with
* [initialState] and the first value emitted by [inputs]. Subsequent values emitted from
* [inputs] will be used to re-render the workflow.
*
* See [launchWorkflowIn] for documentation about most of the parameters and behavior.
*
* @param initialState The [StateT] in which to start the root workflow.
*/
@TestOnly
@UseExperimental(ExperimentalCoroutinesApi::class, FlowPreview::class)
fun <InputT, StateT, OutputT : Any, RenderingT, ResultT> launchWorkflowForTestFromStateIn(
scope: CoroutineScope,
workflow: StatefulWorkflow<InputT, StateT, OutputT, RenderingT>,
inputs: Flow<InputT>,
initialState: StateT,
beforeStart: CoroutineScope.(
renderingsAndSnapshots: Flow<RenderingAndSnapshot<RenderingT>>,
outputs: Flow<OutputT>
) -> ResultT
): ResultT = launchWorkflowImpl(
scope,
RealWorkflowLoop,
workflow,
inputs,
initialState = initialState,
initialSnapshot = null,
beforeStart = beforeStart
)

@UseExperimental(ExperimentalCoroutinesApi::class, FlowPreview::class)
internal fun <InputT, StateT, OutputT : Any, RenderingT, ResultT> launchWorkflowImpl(
scope: CoroutineScope,
workflowLoop: WorkflowLoop,
workflow: StatefulWorkflow<InputT, StateT, OutputT, RenderingT>,
inputs: Flow<InputT>,
initialSnapshot: Snapshot?,
initialState: StateT?,
beforeStart: Configurator<OutputT, RenderingT, ResultT>
): ResultT {
val renderingsAndSnapshots = ConflatedBroadcastChannel<RenderingAndSnapshot<RenderingT>>()
val outputs = BroadcastChannel<OutputT>(capacity = 1)
val workflowJob = Job(parent = scope.coroutineContext[Job])
val workflowScope = scope + workflowJob

// Ensure we close the channels when we're done, so that they propagate errors.
workflowJob.invokeOnCompletion { cause ->
// We need to unwrap the cancellation exception so that we *complete* the channels instead
// of cancelling them if our coroutine was merely cancelled.
val realCause = cause?.unwrapCancellationCause()
renderingsAndSnapshots.close(realCause)
outputs.close(realCause)
}

// Give the caller a chance to start collecting outputs.
val result = beforeStart(workflowScope, renderingsAndSnapshots.asFlow(), outputs.asFlow())

workflowScope.launch {
// Run the workflow processing loop forever, or until it fails or is cancelled.
workflowLoop.runWorkflowLoop(
workflow,
inputs,
initialSnapshot = initialSnapshot,
initialState = initialState,
onRendering = renderingsAndSnapshots::send,
onOutput = outputs::send
)
}

return result
}
Loading

0 comments on commit 950136c

Please sign in to comment.