-
Notifications
You must be signed in to change notification settings - Fork 69
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Delete flatMapWorkflow #377
Conversation
fb1a6d9
to
54b8abc
Compare
TODO:
|
54b8abc
to
66e054b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the WorkflowRunnerViewModel case, if the channel comes from a subscription to a Flowable/Observable, there is nothing appropriate to own that channel and take care of cancelling it.
I'm confused as to what the actual problem is. My impression is that subscribing to the observable returned from flatMapWorkflow opens the channel and unsubscribing closes it. The View Model has a well defined lifetime, owns that subscription, and so by extension owns the channel; thus this method:
override fun onCleared() {
// Has the side effect of closing the updates channel, which in turn
// will fire any tear downs registered by the root workflow.
sub.dispose()
}
Does that not actually work?
Yea it will work, but it's not clear from the type |
Some context here: https://github.com/square/workflow/pull/375/files#r288690389 |
66e054b
to
bdf6ded
Compare
Updated the PR description with more context. |
bdf6ded
to
769f298
Compare
Thanks for the beefed up description, very helpful. I lean against tying our capstone API to something experimental — shining lines are a lot easier to communicate. I think that risk is mitigated by not using any operators isn't a discipline we can expect our clients to internalize, so we'd be inviting them to make a mess for themselves. If you don't see other landmines around the What will happen if the inputs channel is closed, btw? And maybe same question for a Flow approach. If we were using an Rx Flowable or Observable, those can finish before we unsubscribe — does Flow have a similar terminal event? |
If the input channel/flow closes while the workflow is running, the workflow keeps running but never gets any more input updates. |
To me that makes the |
2cdb310
to
6ba47d2
Compare
Removed Flow stuff, replaced with |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need another round on WorkflowRunner. I'll be at my desk the rest of the day after lunch, we should be able to nail it down w/a quick chat.
@@ -63,18 +68,38 @@ interface WorkflowRunner<out OutputT> { | |||
activity: FragmentActivity, | |||
viewRegistry: ViewRegistry, | |||
workflow: Workflow<InputT, OutputT, Any>, | |||
inputs: Flowable<InputT>, | |||
inputs: () -> ReceiveChannel<InputT>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should document here that we will tear down the channel, no? Could go into this being a proxy for a cold observable, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- add docs
@@ -45,7 +50,7 @@ interface WorkflowRunner<out OutputT> { | |||
* A stream of the [output][OutputT] values emitted by the running | |||
* [Workflow][com.squareup.workflow.Workflow]. | |||
*/ | |||
val output: Observable<out OutputT> | |||
val output: Flowable<out OutputT> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Our API on this class is kind of a weird hybrid now, with a ReceiveChannel
as its fundamental input but Rx objects as its output. Would it be too restrictive to stick with Flowable
for input as well? Too awkward to use channels for the outputs?
Once Flow stabilizes, will we use it for output as well as input?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's a thought: what if the WorkflowHost became a public member on the WorkflowRunner, with the Rx methods as conveniences? Could even extract them to a workflow-ui-android-rx
module in a follow up, if that's not too fine grained.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Our API on this class is kind of a weird hybrid now, with a
ReceiveChannel
as its fundamental input but Rx objects as its output. Would it be too restrictive to stick withFlowable
for input as well?
Oh yea, definitely.
Once Flow stabilizes, will we use it for output as well as input?
Definitely.
what if the WorkflowHost became a public member on the WorkflowRunner
WorkflowHost
's API is still a bit of a footgun, so I'd like to hold off for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- remove channel public API
…stead of the ReceiveChannel directly. Making the input cold lets the `WorkflowHost` itself own the life of the channel and take responsibility for closing it, which makes creating a host from, e.g., an `Observable` much easier.
6ba47d2
to
9420add
Compare
Channel<T>(capacity = 1) | ||
.apply { offer(value) } | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are { }
necessary here for return statement?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would be the alternative?
return fun(): ReceiveChannel<T> {
Channel…
}
is just more verbose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad, misread the code :)
Broken into multiple commits for your convenience.
tl;dr:
flatMapWorkflow
is a bad API because it's extremely easy to misuse, and hard to use correctly. It doesn't provide any advantages overWorkflowHost
, which is more flexible and can be made even easier to use correctly.flatMapWorkflow
is fundamentally a bad API for hosting workflows. A running workflow "session" naturally has two main outputs: renderings (along with snapshots), and outputs. The rendering stream is a "property" stream – only the last-emitted rendering actually matters, there needn't be backpressure handling, and the latest rendering can be cached (e.g. using RxJava'sreplay(1)
). Outputs, however, are very hot streams – an output should never be emitted twice (i.e. never be cached), and all outputs should be emitted (i.e. buffering on backpressure). It is impossible to express adequately this using a single stream, so we're left to write tons of documentation and leave it up to the caller to figure out how to do things correctly.WorkflowHost
has the ability to split these outputs itself, ensuring that the correct behavior is used for each. Note that it doesn't currently do this (there's a singleupdates
channel still), but we have better primitives for streams (i.e. Flow stabilizes), we can improve the API.WorkflowHost
also plays nice with dependency injection because it has aFactory
. We're not making use of that in this PR just yet, but this change unblocks our ability to do so.WorkflowRunnerViewModel
now usesWorkflowHost
directly.Closes #311 and closes #320 by making them obsolete.
This PR changes the type of the "inputs" parameter from
ReceiveChannel<InputT>
toFlow<InputT>
. I would love feedback on this decision. The reason for this change is because passing a raw channel requires the caller to be responsible for cancelling the channel. In theWorkflowRunnerViewModel
case, if the channel comes from a subscription to aFlowable
/Observable
, there is nothing appropriate to own that channel and take care of cancelling it. The solution is to make the input stream "cold", so the host (which has a well-defined lifetime) can get the channel when it needs it and then also be responsible for closing it. There are two alternatives:() -> ReceiveChannel<InputT>
: Simplest definition of a "cold channel" – just a function that returns a channel. However the type doesn't indicate clearly whether the caller is still responsible for closing that channel (in the case of{ someChannel }
), so it would be by one-off convention only and require documentation. The best part of this solution is it doesn't use any inherently experimental APIs, but at the cost of being less expressive.Flow<InputT>
: AFlow
is a "cold stream" by definition, which is, I think, ultimately the correct high-level type for the input. UsingFlow
for the type clearly expresses the contract between the caller and theWorkflowHost
– there's no ambiguity around who needs to clean up resources. However, Flow is still a very immature, unstable API. I think that risk is mitigated by not using any operators. We're only using it to express the contract that could also be implemented as() -> ReceiveChannel<InputT>
. Note that all usages are annotated with the correct experimental APIs. It is also mitigated by keepingFlow
out of the API that 99% of actual feature code will ever use, or at least by providing alternative APIs (e.g. adapters for RxJava).Do you agree the API stability risk is worth the expressiveness?