Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide abstraction for cold streams #254

Closed
elizarov opened this issue Feb 21, 2018 · 116 comments
Closed

Provide abstraction for cold streams #254

elizarov opened this issue Feb 21, 2018 · 116 comments

Comments

@elizarov
Copy link
Contributor

All the currently provided channel abstractions in kotlinx.coroutines are hot. The data is being produced regardless of the presence of subscriber. This is good for data sources and applications that are inherently hot, like incoming network and UI-events.

However, hot streams are not an ideal solution for cases where data stream is produced on demand. Consider, for example, the following simple code that produces ReceiveChannel<Int>:

produce<Int> { 
    while (true) {
        val x = computeNextValue()
        send(x)
    } 
}

One obvious downside is the computeNextValue() is invoked before send, so even when receiver is not ready, the next value gets computed. Of course, it gets suspended in send if there is no receiver, but it is not as lazy as you get with cold reactive Publisher/Observable/Flowable/Flux/Flow.

We need the abstraction for cold streams in kotlinx.coroutines that is going to be just as lazy, computing data in "push" mode versus "pull" mode of hot channels that we have now.

There are the following related discussions:

@elizarov
Copy link
Contributor Author

Let me add here that we are looking at an asynchronous analogue of a standard Kotlin's Sequence here. We already can write the following code in Kotlin:

val s = buildSequence<Int> { 
    while (true) {
        val x = computeNextValue()
        yield(x)
    } 
}

and this code is perfectly lazy in the sense that computeNextValue is not invoked until sequence gets to be iterated and only actually requested values are ever compute. However, we cannot do arbitrary suspension from inside of buildSequence. We cannot delay, we cannot do asynchronous network requests, etc.

@pull-vert
Copy link

pull-vert commented Mar 3, 2018

Hi,
I was writing a kotlinx-coroutine reactive (=cold) library based on reactive-streams. But following Elizarov advice on #201 I switched to a pure kotlin suspend version, strongly inspired by your StreamBenchmarks project.
Here is the library : Reactivity

It provides 2 cold producers (Reactor inspiration) :

  • Multi for multi values
  • Solo for single value

As they are reactive objects, they start producing items only when a client calls a Terminal (final/consuming) operation function.

Reactivity is a multiplatform project, with common and platform specific tests and extensions. Providing for example platform specific Solo.toPromise in JS, Solo.toCompletableFuture or Stream.toMulti in JVM (in JDK8 project).
There are only a few operators right now (map, filter, delay) but they can be easily added as they are very simple.

I would be really happy if Reactivity can save time, or serve as source of inspiration for this issue

@ZakTaccardi
Copy link

ZakTaccardi commented Mar 9, 2018

The biggest issue with Channel<T> in my experience is that .openSubscription() has to be called before operators can be applied. I personally really liked LiveData<T>'s approach.

It shipped with LiveData<T>, MutableLiveData<T>, and MediatorLiveData<T>, and two transformations, .map() and .switchMap{ }.

Implementing custom operators is easy, just add an extension function.

Unfortunately, LiveData<T> falls completely flat because it's not thread safe and highly coupled to the main thread.

Ultimately, my use case is that I need a lightweight RxJava for scenarios where I can't use RxJava but want a reactive-state based architecture, and I am hoping co-routines can solve this problem.

@elizarov
Copy link
Contributor Author

@ScottPierce We can open a separate issue about LinkedListBroadcastChannel (that's a broadcast channel implementation with UNLIMITED capacity) if that helps you. What's your specific use-case about it? Go ahead, create a separate issue and describe your use-case for it.

@elizarov
Copy link
Contributor Author

elizarov commented Mar 15, 2018

@pull-vert I'm not a big fan of Single-like abstraction. Solo is great name, btw, but the if we include it in the library, then we'll have three slightly different ways to asynchronously perform an operation that returns a value:

  • async { something } -- starts computation in background immediately to produce result later.
  • async(start = CoroutineStart.LAZY) { something } -- starts computation only when it is requested, sharing result with subsequent requests.
  • solo { something } -- starts computation when it is requested and every time it is requested do it again from scratch.

They are all different ways, but do we really need an abstraction for that last one? Consider this definition:

typealias Solo<T> = suspend () -> T

Isn't this functional type the Solo we are looking for? Do we really need to give it some other name like Solo? We can always declare all the extensions we might need directly on top of that suspend () -> T functional type.

Let me also quote Eric Meijer's tweet here: https://twitter.com/headinthebox/status/971492821151580160

@JakeWharton
Copy link
Contributor

I'd prefer a real type for a few reasons.

The typealias trick can go really far for lots of abstractions but that doesn't always mean it's appropriate. Would you do the same for a multi-value source in suspend (Consumer<T>) -> Unit or would it get a proper type?

These types can and will leak into Java and I'd much prefer a semantically named type than an arbitrary complex function type that only the Kotlin metadata can disambiguate. Ignoring the fact that there's a strong possibility these types will be partially usable from Java, the ability to do simple things like instanceof and isAssignableFrom checks against a real type will help this abstract to be wired into libraries like Retrofit, JAX RS, etc. Even just having them able to be received by Java code and converted into something else (like an RxJava type) goes a long way.

A Solo<T> is a specialization of a Many<T> (or whatever you want to call it) and thus should be polymorphic with respect to it. It's really convenient being able to return a Solo<T> from a call to a Many<T>.flatMap( T -> Many<T> ) function without requiring overloads, for example.

Eric's tweet does not make sense. Single is a factory of a coroutine/async+awaitable. Your typealias is evidence of this. And since the context of the tweet was Java, when we rasterize the typealias into a proper type (as the Java language requires) and desugar coroutines into their underlying form of callbacks (as the Java language requires) we're left with the exact shape of Rx's single so they're actually exactly the same thing.

@elizarov
Copy link
Contributor Author

elizarov commented Mar 15, 2018

I question the very need of Single/Solo. The typealias digression was just to demonstrate what this concept actually does. I question the very use-cases for Single/Solo. Why would you ever use it? Why would you need a dedicated type to denote a reference to a computation that reruns every time you ask for it? Why would you need this level of indirection? Your code will be much easier to understand if you use suspending functions directly.

To illustrate. Instead of:

interface A { fun doSomething(): Single<T> } 

write

interface A { suspend fun doSomething(): T }

Instead of:

fun foo() = // return Single<R>
    doSomething().map { it.transform() }

do

suspend fun foo() = // return R
    doSomething().transform()  

you can continue this example with flatMap, etc, etc. The code that does not use Single/Solo will be invariably more direct and easier to understand.

On the other hand, we need a dedicated abstraction for asynchronous streams of values simply because there is no other way to represent it in Kotlin, but via a separate type.

@JakeWharton
Copy link
Contributor

JakeWharton commented Mar 15, 2018 via email

@akarnokd
Copy link

Isn't this

suspend fun foo() = // return R
    doSomething().transform()  

eagerly executing doSomething and you call a method on the returned value T?

I'd think that given a suspend () -> T, you'd want to create a new function suspend () -> R that when invoked, calls the previous suspend function only then, transform its results and returns that as the value R, right?

@JakeWharton
Copy link
Contributor

Yes, but you wouldn't invoke the method until you needed the value (i.e., when you'd otherwise subscribe). When it's suspend functions all the way down you don't need the abstraction because you can just call the suspending function over and over.

@pull-vert
Copy link

I like the Solo = Single value cold emmitter because it can provide specific Operators, and some intermediate (= cold/not consuming) Operators on Multi can return a Solo :

fun <E> Solo<E>.merge(vararg others: Solo<E>): Multi<E>

inline fun <E, R> Multi<E>.reduce(initial: R, crossinline operation: (acc: R, E) -> R):): Solo<R>

fun <E> Multi<E>.first(): Solo<E>

See https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html to see some Operators of Flux that return Mono.

After that some Terminal (final/consuming) Solo specific extensions I mentioned before are useful :

fun <T> Solo<T>.toPromise(): Promise<T>

fun <E> Solo<E>.toCompletableFuture(
  coroutineContext: CoroutineContext = DefaultDispatcher
): CompletableFuture<E>

@JakeWharton
Copy link
Contributor

JakeWharton commented Mar 15, 2018 via email

@fvasco
Copy link
Contributor

fvasco commented Mar 16, 2018

Hi,
I tried to picture some code to understand better the final shape.

I played with Solo, using a type alias can lead to some interesting collateral effects.

typealias AsyncCallable<T> = suspend () -> T

fun <E> AsyncCallable<E>.toCompletableFuture(
        coroutineContext: CoroutineContext = DefaultDispatcher
): CompletableFuture<E> = future(coroutineContext) { invoke() }

fun main(vararg args: String) = runBlocking {
    val deferred = async { "Hello" }
    val deferredFuture = deferred::await.toCompletableFuture()
}

Then I tried to implement the concatMap, but I don't know if it is possible to do better.

interface AsyncSource<E> {
    suspend fun consumeEach(consumer: suspend (E) -> Unit)
}

fun <E> buildAsyncSource(builder: suspend AsyncSourceBuilder<E>.() -> Unit): AsyncSource<E> = TODO()

interface AsyncSourceBuilder<E> {
    suspend fun yield(item: E)
    suspend fun yieldAll(items: AsyncSource<E>) = items.consumeEach { yield(it) }
}


suspend fun <E> AsyncSource<E>.concatMap(block: suspend (E) -> AsyncSource<E>): AsyncSource<E> =
        buildAsyncSource {
            this@concatMap.consumeEach { item ->
                yieldAll(block(item))
            }
        }

@elizarov
Copy link
Contributor Author

elizarov commented Mar 16, 2018

@pull-vert In kotlinx.coroutines terminal operations on a cold reactive streams (let's call it Multi for this example) that you mention will have the following signatures:

inline suspend fun <E, R> Multi<E>.reduce(initial: R, crossinline operation: (acc: R, E) -> R):): R
suspend fun <E> Multi<E>.first(): E

@JakeWharton I don't think we need any kind of "multiple singles to a multi" (merge) at all. Where can those "multiple singles" can come from? They can appear in a situation like this:

suspend fun listUsers(): List<User>
suspend fun getUserProfile(userId: UserId): UserProfile

Here we want to get a list of users, then get a profile for each user. In reactive Java world we'd define getUserProfile(user: UserId): Single<UserProfile> and we'd have to use some kind of merging operation on those Single types. With coroutines all we need is a plain map:

val profiles: List<UserProfile> = listUsers().map { getUserProfile(it.userId) } 

Note, that sometimes you'd want to perform this map "in parallel" (but be careful that unlimited parallelism can exhaust your local resources and/or DOS your server unless you take appropriate precautions). We have separate issues about that (see #172 and #180).

@pull-vert
Copy link

pull-vert commented Mar 16, 2018

@elizarov I understand and agree your example for terminal/consuming Operations.
But for me the "first" Operator must be Intermediate, that will suspend and send the Single first value of a Multi cold Source but should not be terminal/consuming.

I would prefer the resulting Solo/Single value to be cold and not consuming, to be able to chain Operators until the first terminal/consuming Operator (can have multiple consumers, each will receive all elements).

For exemple a stupid Reactive Operator chain

Multi
                .fromValues("one", "two", "three")
                .filter { it.startsWith("t") } // Intermediate cold = returns Multi with "two" and "three"
                .first() // Intermediate cold = returns a Mono with "two"
                .concatMap { it.splitAndDelay(15)} // Intermediate cold = returns a Multi with "t", "w" and "o" (adding a 15ms delay between each send)
                .consumeEach { println(it)} // terminal/consuming operation printing "t", "w" then "o"

@elizarov
Copy link
Contributor Author

elizarov commented Mar 16, 2018

@pull-vert In a world of channels and cold streams we might have the following function defined in some API:

suspend fun subscribeToSomeStrings(): ReceiveChannel<String> 

It performs an operation to subscribe to the data stream (establishing network connection, etc) and returns a channel of strings flowing from the server. It is hot. There is no much reason to defer the actual subscription until the terminal operation. It is better to "fail-fast" (return an error immediately) if we have a problem establishing communication channel to the server.

The actual processing of this hot data stream would use cold channels in intermediate steps, like this:

subscribeToSomeStrings() // establishes connection to datasource & returns a hot stream
    .filter { it.startWith("t" } // intermediate - returns cold stream (not coroutine to filter it yet)
    .first() // terminal, starts filtering incoming stream, returns a String
    .split() // a regular operator on String, no need to wrap this operator
    .delayEach(15) // intermediate - returns cold stream with delays between each items
    .consumeEach { ... } // terminal

There is no much need for special "glue" operators that work on Single/Solo. You can apply regular functions (like String.split()) directly on the values that you have. Interestingly, if we replace the first line with a function that returns List<String> instead of ReceiveChannel<String>, then the code will compile and continue to work. It would just become more eager in processing the data.

And that is whole beauty. You can take a whole synchronous data processing pipeline with intermediate operators like filter, map and terminal operators like first and introduce asynchrony into it (mabe make a source asynchronous or maybe introduce some asynchronous request inside one of the map or filter operators) and it would continue to work without you having to change a single line of code. IDE will helpfully highlight with suspension marks in the gutter what pieces of your pipeline have become asynchronous.

@pull-vert
Copy link

@elizarov Thanks for this clarification I now understand your vision for introducing cold Intermediate Operators to Channels, not using new Coroutine for each Operator with a lot better performance than actual.

I am still a bit confused by this chaining of mixed hot and cold Operators, it will require good documentation to be sure in what state (hot instantly usable or cold that will require terminal consuming operator) we are at every step of the chain.
I wonder how this will interact with existing pure cold reactive libraries (rxjs, rxjava, reactor, java 9 Flow...), for example if we want to use kotlinx-coroutines between a cold Observable rxjava reactive Couchbase driver as Input and return a cold Mono as Spring 5 output ?

If I understand your example delayEach or map cold intermediate Operator will have to be declared as Extension on kotlinx ReceiveChannel, java 8 Stream, kotlin Sequence, kotlin Collection, kotlin Array, maybe more.

@elizarov
Copy link
Contributor Author

@pull-vert The mental model is simple. There are no cold operators. You just "setup your pipeline" (that is what cold operators do) that is later executed by the terminal operator. Since every chain of operators ultimately ends in a terminal operator, you don't really care what happens in between. It just works.

With respect to the kind of operators we need to provide, we have huge advantage over reactive libraries because we only need to define basic ones like map and filter. For example, there is no need to have ready-to-use delayEach(time) in the library, because it can be trivially implemented with .map { delay(time); it }

@pull-vert
Copy link

@elizarov Thanks again now I see and understand exactly the direction for this cold stream operators topic for kotlinx-coroutines.

I will continue enrich my pure cold reactive library for fun (strongly based on kotlinx-coroutines), and of course follow what is going on here!

@arkivanov
Copy link

As an alternative we are developing Reaktive library: https://github.com/badoo/Reaktive

@pakoito
Copy link

pakoito commented Apr 10, 2019

As an alternative we are developing Reaktive library: https://github.com/badoo/Reaktive

That's really neat! Do you plan to add support for suspend functions and the concurrent operators? I need my takeUntil :D

@arkivanov
Copy link

As an alternative we are developing Reaktive library: https://github.com/badoo/Reaktive

That's really neat! Do you plan to add support for suspend functions and the concurrent operators? I need my takeUntil :D

Thanks for feedback 😊 suspend functions are not planned and there are already a lot of operators like combineLatest, zip, merge, sample, debounce, throttle. I'll put takeUntill into the queue. You can also open an issue for that to keep track 😉

@PaulWoitaschek
Copy link
Contributor

@fvasco If you remove the delayEach, it crashes:
"Exception in thread "main" kotlinx.coroutines.channels.ClosedReceiveChannelException: Channel was closed".
If you change the delay to smaller numbers, i.e.

    val firstFlow = flowOf(1, 2, 3, 4).delayEach(10)
    val secondFlow = flowOf("a", "b", "c").delayEach(15)

You don't end up with the latest values:

1a
2a
2b
3b
4b

@fvasco
Copy link
Contributor

fvasco commented Apr 10, 2019

Good catch @PaulWoitaschek! Don't use it, I'm not interested to engage myself on this issue.

@jcornaz
Copy link
Contributor

jcornaz commented Apr 10, 2019

I think it would be nice if FlowCollector was a CoroutineScope.

Then one could always safely start a coroutine from the flow builder and be sure that:

  • the flow is not completed before all children coroutines are completed
  • the children coroutines are cancelled as soon as the flow is cancelled (for instance, because of a terminal operator like take(n), first() and any operator which do not wait until the end of the flow.

@elizarov
Copy link
Contributor Author

elizarov commented Apr 10, 2019

@jcornaz Making or not making FlowCollector a CoroutineScope is a performance trade-off. Simple, sequential flow-chains do not need a coroutine scope, because they are sequential and involved no concurrency so providing CoroutineScope impl for them is going to be just a performance overhead (extra allocated objects). It would benefit more complex operators that involve concurrency, since otherwise these complex operators need to use coroutineScope { ... } themselves.

So far we are leaning to a performance-conscious choice -- keep simple operators faster, and it is Ok if more complex operators have to use coroutineScope occasionally when they need it.

@fvasco
Copy link
Contributor

fvasco commented Apr 10, 2019

@jcornaz I not agree with you, it is possible to build a coroutineScope inside the flow builder (with all proposed benefit).
Moreover most of them depends by a Job, a Flow pure concept should not depend from any Job or Dispatcher, I think.

This is why I am reluctant of the combineXxx operator, it easy can build for Channel, not for Flow, so it should be implemented for Channel instead of Flow.

@jcornaz
Copy link
Contributor

jcornaz commented Apr 10, 2019

@elizarov. It makes sense, thanks for the explanation.

@fvasco

Moreover most of them depends by a Job, a Flow pure concept should not depend from any Job or Dispatcher, I think.

I agree.

This is why I am reluctant of the combineXxx operator, it easy can build for Channel, not for Flow, so it should be implemented for Channel instead of Flow.

I think it is as easy/hard to build for both, and with the same benefits/problems (like the order of the output not being guaranteed). The only difference is that ReceiveChannel is less safe because it represents an open resource. Which is why I don't agree there. I think all operator could and should be available on Flow instead of ReceiveChannel because Flow is a safer API.

On a completely different topic:
Why FlowCollector doesn't have a declaration-site invariance?

Here is what I'd expect:

@FlowPreview
public interface FlowCollector<in T> {
    public suspend fun emit(value: T)
}

But here is what we have:

@FlowPreview
public interface FlowCollector<T> {
    public suspend fun emit(value: T)
}

@qwwdfsad
Copy link
Member

qwwdfsad commented Apr 10, 2019

Why FlowCollector doesn't have a declaration-site invariance?

It's an oversight, I will fix it in flow-leftovers branch (target: 1.2.0)

What about combineLatest: it is implementable for both Flow and Channel in more or less the same way.

No new operators will be introduced to channels, existing ones will be eventually deprecated and hidden. Channel is a synchronization primitive, analogue of blocking queue in coroutines world, not a tool to build reactive pipelines on top of.

@jcornaz
Copy link
Contributor

jcornaz commented Apr 10, 2019

@elizarov

Making or not making FlowCollector a CoroutineScope is a performance trade-off. Simple, sequential flow-chains do not need a coroutine scope, because they are sequential and involved no concurrency so providing CoroutineScope impl for them is going to be just a performance overhead (extra allocated objects). It would benefit more complex operators that involve concurrency, since otherwise these complex operators need to use coroutineScope { ... } themselves.

Ok for the flow builder, but couldn't the block of flowViaChannel get a CoroutineScope?

  1. It makes sense I think because if one explicitly want to create a flow via a channel, it is very likely that he/she will use coroutines. (After all, a Channel is nothing but a communication primitive for coroutines)
  2. Performance concern does not apply there, because the implementation of flowViaChannel already needs to use coroutineScope. So it could just be passed to the block without any overhead.

EDIT: Actually, in the case of a flowViaChannel wouldn't it make sense to get a ProducerScope?

@qwwdfsad
Copy link
Member

qwwdfsad commented Apr 10, 2019

@jcornaz I will answer instead of Roman.

This is actually a good idea, thanks for pointing it out.
We have some ideas about optional CoroutineScope in flow builder and this is very similar to flowViaChannel. I will focus on this after 1.2.0 and there are high chances flowViaChannel will have scope receiver in 1.2.1.

It is hard to track ideas and feature requests under this issue, we will close it right after 1.2.0. I've created #1081 to track progress on that

@fvasco
Copy link
Contributor

fvasco commented Apr 12, 2019

Some personal consideration regarding Limit.kt.

drop counts elements (

), when count is reached there is no reason to increment skipped.

Should take work with count = 0?

Lastly in concatenate method the inner variable can be declared instead of the implicit it.

@qwwdfsad
Copy link
Member

qwwdfsad commented Apr 13, 2019

Closing this one as fixed in 1.2.0.
There is still a lot of open questions, missing operators and various design problems which is hard to track under this issue. If you feel that something is wrong or some functionality is missing, please create a separate issue for that.

Flow looks like both Sequence and Flowable (or Flux, or Publisher) and there is a lot of temptation to create issues like "Please add operator X because I've got used to it in $FRAMEWORK".
But please explain why you need such an operator and what domain-specific problem does it solve. We ask for it not because we don't want to add more operators, but because we want Flow to solve real problems in the most elegant way and it is not always operators people get used to. And understanding how Flow is used is crucial for us for further design.

A good example of how things are different in Flow is flatMap operator. It is hard to compose reactive applications without it but in coroutines world, we have suspending map operator, so most of the combination issues can be solved by a simple suspension, thus greatly decreasing use of flatMap or merge.

@pakoito
Copy link

pakoito commented Apr 13, 2019

A good example of how things are different in Flow is flatMap operator. It is hard to compose reactive applications without it but in coroutines world, we have suspending map operator, so most of the combination issues can be solved by a simple suspension, thus greatly decreasing use of flatMap or merge.

Say then that you renamed flatMap to map, or that you made map behave like flatMap. Using incorrect nomenclature, even if it's to point out how superficially simple your solution is, may confuse users across frameworks because they have some preconceived expectations. One of them being that the lambda passed to a map function cannot and should not throw with the real-world consequence of mismatched expectations on different APIs (i.e. Set, List) that'll lead to crashes.

@zach-klippenstein
Copy link
Contributor

In some non-suspending reactive libraries, map and flatMap are different in two ways:

  1. map is one-to-one (one value in, one value out), and flatMap is one-to-many.
  2. map is synchronous (must return immediately), and flatMap can be asynchronous, since it works with stream types.

In this library, the two operators still have the first distinction, but not the second. RxJava, for example, has a flatMapSingle operator that is still one-to-one but allows the result value to be produced asynchronously. In Flow, you can just use a simple map for that. flatMap is still useful if you need to map a single input value to multiple output values.

I think this naming is a good choice for the library because the cardinality difference is the more important one – operators like flatMap and flatten are so-called because they take a two-dimensional thing (a list of lists, a stream of streams), and "flatten" it into a one-dimensional thing (list/stream of single items). This is also the only distinguishing property in other APIs like Kotlin's list and sequence, which don't deal with asynchrony at all.

@pakoito
Copy link

pakoito commented Apr 13, 2019

In this library, the two operators still have the first distinction, but not the second.

I'm not sure I follow, you can easily come up with examples that jump threads, for example map { async(CommonPool) { throw RuntimeException() }.await() } is asynchronous and legitimately throws. Each value can even jump to a different thread and require non-blocking await for the current thread.

In other libraries, assuming you're following the spec, you cannot fail or jump threads on map because it's synchronous and blocking and you cannot construct an "error value" such a throwing coroutine or Observable.error(). That's what flatMap is used for. Also, flatMap is not limited to multiple values on a stream, for example rx.Single has it.

flatMap exists to represent sequentiality of this wrapped/suspended value followed by another. There's some semantics on how to flatten streams so we came up with new names for them.

My small concern here is that by making map suspended you're recreating flatMap under another name and not setting the right expectations, nor is this library simpler for it.

Maybe I'm being too picky about it 🤪but there are precedents where this distinction mattered in other languages.

@JakeWharton
Copy link
Contributor

JakeWharton commented Apr 13, 2019 via email

@pakoito
Copy link

pakoito commented Apr 13, 2019

and_then then? :D

@zach-klippenstein
Copy link
Contributor

I realize this response is a bit late, but just wanted to get some clarification on @elizarov's comment above:

publish is already there -- it is called produceIn. It is a terminal operator for a flow that pushes data from it to the outgoing channel (you can convert that channel to flow again). replay analogue is broadcastIn, but it does not yet support unlimited-size buffer.

I don't see how produceIn is equivalent to publish. publish and replay both do multicasting, they differ in behavior upon subscription. As far as I can tell, only broadcastIn does multicasting. Neither have flexible replaying behavior (a conflated broadcast channel is similar to replay(1).refCount()).

@streetsofboston
Copy link

@zach-klippenstein
You may be interested in this issue:
#1086

loob2dev pushed a commit to loob2dev/spring-r2dbc that referenced this issue Apr 3, 2021
This commit introduces Coroutines support for `DatabaseClient`
functional API via Kotlin extensions that provide suspendable
functions prefixed by `await` for `Mono` based APIs.

Extensions for `Flux` will be added when Kotlin/kotlinx.coroutines#254
will be fixed.

It also provides `asType<Foo>()` extensions useful for Reactive API
as well.

Original pull request: #63.
rayrobert398 added a commit to rayrobert398/spring-data-Mobile-App-Developer-Java that referenced this issue Oct 3, 2022
This commit introduces Coroutines support for `DatabaseClient`
functional API via Kotlin extensions that provide suspendable
functions prefixed by `await` for `Mono` based APIs.

Extensions for `Flux` will be added when Kotlin/kotlinx.coroutines#254
will be fixed.

It also provides `asType<Foo>()` extensions useful for Reactive API
as well.

Original pull request: #63.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests