Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow.materialize/dematerialize operators #2092

Open
elizarov opened this issue Jun 16, 2020 · 4 comments
Open

Flow.materialize/dematerialize operators #2092

elizarov opened this issue Jun 16, 2020 · 4 comments

Comments

@elizarov
Copy link
Contributor

This is a stub issue to introduce Flow.materialize and Flow.dematerialize operators that would materilize flow completion (both normal completion and error) and will be primarily designed to integrate with sharing operators (see #2047). The detailed design is TBD.

@eduanb
Copy link

eduanb commented Feb 15, 2023

My 2c on why this should be given higher priority.

There are real-world use cases for dividing up a Flow using filters and processing them separately. Take, for instance, reading a large log file. You'd like to handle INFO/WARN/DEBUG logs separately so you share the flow and filter individually. However, this code never terminates:

val logsFlow: Flow<LogMessage> = readLogFile()
val logsSharedFlow = logsFlow.shareIn(this, Eagerly)
val infoLogs = logsSharedFlow.filter { it.type == INFO }.count()
val warnLogs = logsSharedFlow.filter { it.type == WARN }.count()

I'm using count here as an example(which could be achieved in other ways), it will be much more complex in practice.

This was possible a while back using broadcastIn, but this functionality has since been removed. See this answer https://stackoverflow.com/questions/57807545/how-to-split-a-kotlin-flow-into-2-flows

So as I see it, you have 4 options at the moment:

  • Use a standard flow and cause the original producer to be called multiple times for each filter (reading the log file multiple times), which is very expensive.
  • Use a SharedFlow and not have the program terminate, so not a solution.
  • Immediately collect it to a list and use list/sequence operations, why have a flow at all then?
  • Add your own VERY complex materialize function. See how it took users here a YEAR of iterating to get a complete solution Unable to get intended replay configuration with completing shared flows #2890

As far as I understand, this is what the original shareIn proposal was about #1261

In terms of materialize/dematerialize, I find it a bad API. Here are my reasons:

  • The learning curve of materialized flows. This is on top of Flows, StateFlow, SharedFlow etc.
  • The confusion of non-terminating shared flows had to be documented, and IntelliJ warnings were added. This could have been avoided entirely.
  • According to this proposal, the additional Flow.materialize call that has to be added

I would propose that SharedFlow be materialized by default. Or at least add some parameter to shareIn(), like materialize=true, or create a materializedShareIn() shorthand.

@pacher
Copy link

pacher commented Feb 15, 2023

@eduanb This was discussed multiple times and the answer always was (example) that share is the wrong operator for this use cases and that there should be a dedicated replicate operator.
But I don't think it is anywhere on the roadmap and will not be implemented any time soon. There is no open issue about it to begin with.

@eduanb
Copy link

eduanb commented Feb 15, 2023

@pacher Thank you for sharing. After reading more from that post, it seems like replicate is what I need. I originally thought that materialize would solve it. It's unfortunate that it is not looked at at the moment as it is a common use case, especially on the server side.

Side note, a replicate operator reminds me of a wiretap from EIP patterns. That might be another possible solution/naming worth exploring.

@elizarov
Copy link
Contributor Author

Just in case anyone actually needs materialize/dematerialize (with all the caveats that usually you it means you are looking at your problem from the wrong perspective), they are extremely easy to implement in your own code:

sealed class ValueOrCompletion<out T> {
    data class Value<out T>(val value: T) : ValueOrCompletion<T>()
    data class Completion(val exception: Throwable?) : ValueOrCompletion<Nothing>()
}

fun <T> Flow<T>.materializeCompletion(): Flow<ValueOrCompletion<T>> = flow {
    val result = runCatching {
        collect { it -> emit(ValueOrCompletion.Value(it)) }
    }
    emit(ValueOrCompletion.Completion(result.exceptionOrNull()))
}

fun <T> Flow<ValueOrCompletion<T>>.dematerializeCompletion(): Flow<T> = transformWhile { vc ->
    when(vc) {
        is ValueOrCompletion.Value -> {
            emit(vc.value)
            true
        }
        is ValueOrCompletion.Completion -> {
            vc.exception?.let { throw it }
            false
        }
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants