Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace WorkflowHost with the launchWorkflowIn function. #447

Merged
merged 2 commits into from
Jul 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,18 @@ import com.googlecode.lanterna.screen.TerminalScreen
import com.googlecode.lanterna.terminal.DefaultTerminalFactory
import com.squareup.workflow.Worker
import com.squareup.workflow.Workflow
import com.squareup.workflow.WorkflowHost
import com.squareup.workflow.asWorker
import com.squareup.workflow.launchWorkflowIn
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.ObsoleteCoroutinesApi
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.BroadcastChannel
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.first
Expand All @@ -42,21 +42,17 @@ import kotlinx.coroutines.launch
import kotlinx.coroutines.plus
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.selects.selectUnbiased
import kotlin.coroutines.EmptyCoroutineContext

/**
* Hosts [Workflow]s that:
* - gets information about the terminal configuration as input
* - renders the text to display on the terminal
* - finishes by emitting an exit code that should be passed to [kotlin.system.exitProcess].
*
* @param hostFactory Used to create the actual [WorkflowHost] that hosts workflows. Any dispatcher
* configured on the host will be ignored, to ensure that key events stay in sync with renderings.
* @param ioDispatcher Defaults to [Dispatchers.IO] and is used to listen for key events using
* blocking APIs.
*/
class TerminalWorkflowRunner(
private val hostFactory: WorkflowHost.Factory = WorkflowHost.Factory(EmptyCoroutineContext),
private val ioDispatcher: CoroutineDispatcher = Dispatchers.IO
) {

Expand All @@ -74,8 +70,6 @@ class TerminalWorkflowRunner(
// when invoking from coroutines. This entire function is blocking however, so we don't care.
@Suppress("BlockingMethodInNonBlockingContext")
fun run(workflow: TerminalWorkflow): ExitCode = runBlocking {
val configs = BroadcastChannel<TerminalInput>(CONFLATED)
val host = hostFactory.run(workflow, configs.asFlow(), context = coroutineContext)
val keyStrokesChannel = screen.listenForKeyStrokesOn(this + ioDispatcher)
val keyStrokesWorker = keyStrokesChannel.asWorker()
val resizes = screen.terminal.listenForResizesOn(this)
Expand All @@ -86,7 +80,7 @@ class TerminalWorkflowRunner(
try {
screen.startScreen()
try {
runTerminalWorkflow(screen, configs, host, keyStrokesWorker, resizes)
return@runBlocking runTerminalWorkflow(workflow, screen, keyStrokesWorker, resizes)
} finally {
screen.stopScreen()
}
Expand All @@ -102,67 +96,67 @@ class TerminalWorkflowRunner(
@Suppress("BlockingMethodInNonBlockingContext")
@UseExperimental(FlowPreview::class, ExperimentalCoroutinesApi::class)
private suspend fun runTerminalWorkflow(
workflow: TerminalWorkflow,
screen: TerminalScreen,
inputs: SendChannel<TerminalInput>,
host: WorkflowHost<ExitCode, TerminalRendering>,
keyStrokes: Worker<KeyStroke>,
resizes: ReceiveChannel<TerminalSize>
): ExitCode = coroutineScope {
var input = TerminalInput(screen.terminalSize.toSize(), keyStrokes)
val inputs = ConflatedBroadcastChannel(input)

// Use the result as the parent Job of the runtime coroutine so it gets cancelled automatically
// if there's an error.
val result =
launchWorkflowIn(this, workflow, inputs.asFlow()) { renderingsAndSnapshots, outputs ->
val renderings = renderingsAndSnapshots.map { it.rendering }
.produceIn(this)

launch {
while (true) {
val rendering = selectUnbiased<TerminalRendering> {
resizes.onReceive {
screen.doResizeIfNecessary()
?.let {
// If the terminal was resized since the last iteration, we need to notify the
// workflow.
input = input.copy(size = it.toSize())
}

// Publish config changes to the workflow.
inputs.send(input)

// Sending that new input invalidated the lastRendering, so we don't want to
// re-iterate until we have a new rendering with a fresh event handler. It also
// triggered a render pass, so we can just retrieve that immediately.
return@onReceive renderings.receive()
}

renderings.onReceive { it }
}

// Need to send an initial input for the workflow to start running.
inputs.offer(input)

// Launch the render loop in a new coroutine, so this coroutine can just sit around and wait
// for the workflow to emit an output.
val renderJob = launch {
val renderings = host.renderingsAndSnapshots.map { it.rendering }
.produceIn(this)

while (true) {
val rendering = selectUnbiased<TerminalRendering> {
resizes.onReceive {
screen.doResizeIfNecessary()
?.let {
// If the terminal was resized since the last iteration, we need to notify the
// workflow.
input = input.copy(size = it.toSize())
screen.clear()
screen.newTextGraphics()
.apply {
foregroundColor = rendering.textColor.toTextColor()
backgroundColor = rendering.backgroundColor.toTextColor()
rendering.text.lineSequence()
.forEachIndexed { index, line ->
putString(TOP_LEFT_CORNER.withRelativeRow(index), line)
}
}

// Publish config changes to the workflow.
inputs.send(input)

// Sending that new input invalidated the lastRendering, so we don't want to
// re-iterate until we have a new rendering with a fresh event handler. It also
// triggered a render pass, so we can just retrieve that immediately.
return@onReceive renderings.receive()
screen.refresh(COMPLETE)
}

renderings.onReceive { it }
}

screen.clear()
screen.newTextGraphics()
.apply {
foregroundColor = rendering.textColor.toTextColor()
backgroundColor = rendering.backgroundColor.toTextColor()
rendering.text.lineSequence()
.forEachIndexed { index, line ->
putString(TOP_LEFT_CORNER.withRelativeRow(index), line)
}
}

screen.refresh(COMPLETE)
return@launchWorkflowIn async { outputs.first() }
}
}

// Start collecting from outputs before starting the workflow host, in case it emits immediately.
val exitCodeDeferred = async { host.outputs.first() }
val workflowJob = host.start()

// Stop the runner and return the exit code as soon as the workflow emits one.
val exitCode = exitCodeDeferred.await()
workflowJob.cancel()
renderJob.cancel()
val exitCode = result.await()
// If we don't cancel the workflow runtime explicitly, coroutineScope will hang waiting for it to
// finish.
coroutineContext.cancelChildren(
CancellationException("TerminalWorkflowRunner completed with exit code $exitCode")
)
return@coroutineScope exitCode
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
* Copyright 2019 Square Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.squareup.workflow

import com.squareup.workflow.internal.RealWorkflowLoop
import com.squareup.workflow.internal.WorkflowLoop
import com.squareup.workflow.internal.unwrapCancellationCause
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.BroadcastChannel
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.plus
import org.jetbrains.annotations.TestOnly

/**
* Don't use this typealias for the public API, better to just use the function directly so it's
* more obvious how to use it.
*/
@UseExperimental(ExperimentalCoroutinesApi::class)
internal typealias Configurator <O, R, T> = CoroutineScope.(
renderingsAndSnapshots: Flow<RenderingAndSnapshot<R>>,
outputs: Flow<O>
) -> T

/**
* Launches the [workflow] in a new coroutine in [scope]. The workflow tree is seeded with
* [initialSnapshot] and the first value emitted by [inputs]. Subsequent values emitted from
* [inputs] will be used to re-render the workflow.
*
* This is the primary low-level entry point into the workflow runtime. If you are writing an app,
* you should probably be using a higher-level entry point that will also let you define UI bindings
* for your renderings.
*
* ## Initialization
*
* Before starting the actual workflow runtime, this function will invoke [beforeStart] and pass
* it the [Flow]s of renderings, snapshots, and outputs, as well as a [CoroutineScope] that the
* runtime will be hosted in. The workflow runtime will not be started until after this function
* returns. This is to allow the output flow to start being collected before any outputs can
* actually be emitted. Collectors that start _after_ [beforeStart] returns may not receive outputs
zach-klippenstein marked this conversation as resolved.
Show resolved Hide resolved
* that are emitted very quickly. The value returned by [beforeStart] will be returned from this
* function after the runtime is launched, handy for instantiating platform-specific runner objects.
* The [CoroutineScope] passed to [beforeStart] can be used to do more than just cancel the runtime
* – it can also be used to start coroutines that will run until the workflow is cancelled,
* typically to collect the rendering and output [Flow]s.
*
* ## Scoping
*
* The workflow runtime makes use of
* [structured concurrency](https://medium.com/@elizarov/structured-concurrency-722d765aa952).
* The runtime is started in a specific [CoroutineScope], which defines the context for the entire
* workflow tree – most importantly, the [Job] that governs the runtime's lifetime and exception
* reporting path, and the [CoroutineDispatcher][kotlinx.coroutines.CoroutineDispatcher] that
* decides on what threads to run workflow code.
*
* This function creates a child [Job] in [scope] and uses it as the parent for the workflow
* runtime, and as the job passed to the [beforeStart] function. This means that if [beforeStart]
zach-klippenstein marked this conversation as resolved.
Show resolved Hide resolved
* calls [CoroutineScope.cancel][Job.cancel], it will cancel the workflow runtime, but will not cancel the
* [scope] passed into this function.
zach-klippenstein marked this conversation as resolved.
Show resolved Hide resolved
*
* @param scope The [CoroutineScope] in which to launch the workflow runtime. Any exceptions thrown
* in any workflows will be reported to this scope, and cancelling this scope will cancel the
* workflow runtime. The scope passed to [beforeStart] will be a child of this scope.
* @param workflow The root workflow to start.
* @param inputs Stream of input values for the root workflow. The first value emitted is passed to
* the root workflow's [StatefulWorkflow.initialState], and subsequent emissions are passed as
zach-klippenstein marked this conversation as resolved.
Show resolved Hide resolved
* input updates to the root workflow. If this flow completes before emitting anything, the runtime
* will fail (report an exception up through [scope]). If this flow completes _after_ emitting at
* least one value, the runtime will _not_ fail or stop, it will continue running with the
* last-emitted input.
* @param initialSnapshot If not null, used to restore the workflow.
* @param beforeStart Called exactly once with the flows for renderings/snapshots and outputs.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Insert after first sentence: Called on a newly created child [Job] of the given [scope], which defines the lifetime of the launched workflow tree. Cancelling that job ends the new workflow session.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reworded a bit, WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still find "It also gets a sub-scope" confusing since the scope is this, not a parameter. That's what got me talking about receiver elsewhere, although I take your point that that's a weird term too -- this ain't Smalltalk. Called on doesn't work for you?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be curious what the general opinion is on this. I still think of receivers as pretty much regular parameters, with slightly different syntax rules, but I have no reason to think that's a widely-shared mental model. "Called on" doesn't seem great to me though, since the receiver doesn't know anything about the method call – it makes sense from the caller's perspective, but not from the implementer's.

* It also gets a sub-scope of [scope] with a newly created child [Job] which defines the lifetime
* of the launched workflow tree. Cancelling that job ends the new workflow session.
* Note that if [scope] is already cancelled when this function is called, the receiver scope will
* also be cancelled, and the flows will complete immediately.
*
* @return The value returned by [beforeStart].
*/
@UseExperimental(ExperimentalCoroutinesApi::class)
fun <InputT, OutputT : Any, RenderingT, RunnerT> launchWorkflowIn(
scope: CoroutineScope,
workflow: Workflow<InputT, OutputT, RenderingT>,
inputs: Flow<InputT>,
initialSnapshot: Snapshot? = null,
beforeStart: CoroutineScope.(
renderingsAndSnapshots: Flow<RenderingAndSnapshot<RenderingT>>,
outputs: Flow<OutputT>
) -> RunnerT
): RunnerT = launchWorkflowImpl(
scope,
RealWorkflowLoop,
workflow.asStatefulWorkflow(),
inputs,
initialSnapshot = initialSnapshot,
initialState = null,
beforeStart = beforeStart
)

/**
* Launches the [workflow] in a new coroutine in [scope]. The workflow tree is seeded with
* [initialState] and the first value emitted by [inputs]. Subsequent values emitted from
* [inputs] will be used to re-render the workflow.
*
* See [launchWorkflowIn] for documentation about most of the parameters and behavior.
*
* @param initialState The [StateT] in which to start the root workflow.
*/
@TestOnly
@UseExperimental(ExperimentalCoroutinesApi::class)
fun <InputT, StateT, OutputT : Any, RenderingT, RunnerT> launchWorkflowForTestFromStateIn(
scope: CoroutineScope,
workflow: StatefulWorkflow<InputT, StateT, OutputT, RenderingT>,
inputs: Flow<InputT>,
initialState: StateT,
beforeStart: CoroutineScope.(
renderingsAndSnapshots: Flow<RenderingAndSnapshot<RenderingT>>,
outputs: Flow<OutputT>
) -> RunnerT
): RunnerT = launchWorkflowImpl(
scope,
RealWorkflowLoop,
workflow,
inputs,
initialState = initialState,
initialSnapshot = null,
beforeStart = beforeStart
)

@UseExperimental(ExperimentalCoroutinesApi::class, FlowPreview::class)
internal fun <InputT, StateT, OutputT : Any, RenderingT, RunnerT> launchWorkflowImpl(
scope: CoroutineScope,
workflowLoop: WorkflowLoop,
workflow: StatefulWorkflow<InputT, StateT, OutputT, RenderingT>,
inputs: Flow<InputT>,
initialSnapshot: Snapshot?,
initialState: StateT?,
beforeStart: Configurator<OutputT, RenderingT, RunnerT>
): RunnerT {
val renderingsAndSnapshots = ConflatedBroadcastChannel<RenderingAndSnapshot<RenderingT>>()
val outputs = BroadcastChannel<OutputT>(capacity = 1)
val workflowJob = Job(parent = scope.coroutineContext[Job])
val workflowScope = scope + workflowJob

// Ensure we close the channels when we're done, so that they propagate errors.
workflowJob.invokeOnCompletion { cause ->
// We need to unwrap the cancellation exception so that we *complete* the channels instead
// of cancelling them if our coroutine was merely cancelled.
val realCause = cause?.unwrapCancellationCause()
renderingsAndSnapshots.close(realCause)
outputs.close(realCause)
}

// Give the caller a chance to start collecting outputs.
val result = beforeStart(workflowScope, renderingsAndSnapshots.asFlow(), outputs.asFlow())

workflowScope.launch {
// Run the workflow processing loop forever, or until it fails or is cancelled.
workflowLoop.runWorkflowLoop(
workflow,
inputs,
initialSnapshot = initialSnapshot,
initialState = initialState,
onRendering = renderingsAndSnapshots::send,
onOutput = outputs::send
)
}

return result
}
Loading