-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce SharedFlow and sharing operators #2069
Conversation
Yes, just can think of it this way. The correct mental model, though, is that until the first subscriber subscribers there's always an internal very-fast subscriber that consumes all subscribed messages immediately (so that |
This PR is now complete, squashed, and rebased onto the |
That's clear and makes sense for the |
* [emit][MutableSharedFlow.emit] does not suspend while there is a buffer space remaining (optional, cannot be negative, defaults to zero). | ||
* @param onBufferOverflow configures an action on buffer overflow (optional, defaults to | ||
* [suspending][BufferOverflow.SUSPEND] attempt to [emit][MutableSharedFlow.emit] a value, | ||
* supported only when `replay > 0` or `extraBufferCapacity > 0`). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does "supported" mean here? If replay == 0
, is this value just ignored? DROP_LATEST
and DROP_OLDEST
seem like they would both mean emitters would never suspend but otherwise be equivalent, but SUSPEND
could still suspend emitters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It means that IllegalArgumentException
is thrown if you try to do replay = 0
and onBufferOverflow != SUSPEND
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see the general "This function throws [IllegalArgumentException] on unsupported values of parameters of combinations thereof" warning at above, but I think it would be more clear to explicitly put your reply to my comment in the docs. It's a little more verbose, but given the complexity of the interactions of the various values of these parameters I think it would be worth it.
|
While looking through the documentation, I came across quite a few minor grammatical errors. If you like I can point them out here, but maybe it can benefit from some proofreading. |
You can directly point to places in the code via GitHub interface (and "Files Changed" tab) and even provide suggested replacements. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is the grammar review of the documentation and comments. Most issues are quite minor, but there are a lot of them. Many are probably subjective though, so feel free to change or ignore as seems appropriate.
I have provided suggested replacements, but did not make any adjustments for line wrapping, which might also need to be accounted for when implementing the changes.
kotlinx-coroutines-core/common/test/flow/sharing/ShareInTest.kt
Outdated
Show resolved
Hide resolved
b2f2c1f
to
78e7e75
Compare
Any chance to have this reviewed by @qwwdfsad and merged before Kotlin 1.4 is released? ;) |
4e1d3f5
to
6566e1e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt
Outdated
Show resolved
Hide resolved
synchronized(this) { | ||
val oldState = _state.value | ||
if (expectedState != null && oldState != expectedState) return false // CAS support | ||
if (oldState == newState) return true // Don't do anything if value is not changing, but CAS -> true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oldState
is volatile and can be read without a lock.
Its value can be changed during a comparison, but it's inseparable from an arbitrary interleaving of two updates.
Tho oldValue
still has to be re-read under the lock, so maybe it's not worth it
Last minute change: let's mark |
This is amazing! |
@prithivraj Look at the label, that will answer your first question. |
* Introduce SharedFlow and sharing operators Summary of changes: * SharedFlow, MutableSharedFlow and its constructor. * StateFlow implements SharedFlow. * SharedFlow.onSubscription operator, clarified docs in other onXxx operators. * BufferOverflow strategy in kotlinx.coroutines.channels package. * shareIn and stateIn operators and SharingStarted strategies for them. * SharedFlow.flowOn error lint (up from StateFlow). * Precise cancellable() operator fusion. * Precise distinctUntilChanged() operator fusion. * StateFlow.compareAndSet function. * asStateFlow and asSharedFlow read-only view functions. * Consistently clarified docs on cold vs hot flows. * Future deprecation notice for BroadcastChannel, ConflatedBroadcastChannel, broadcast, and broadcastIn. * Channel(...) constructor function has onBufferOverflow parameter. * buffer(...) operator has onBufferOverflow parameter. * shareIn/stateIn buffer and overflow strategy are configured via upstream buffer operators. * shareIn/stateIn fuse with upstream flowOn for more efficient execution. * conflate() is implemented as buffer(onBufferOverflow=KEEP_LATEST), non-suspending strategies are reasonably supported with 0 and default capacities. * Added reactive operator migration hints. * WhileSubscribed with kotlin.time.Duration params Fixes Kotlin#2034 Fixes Kotlin#2047 Co-authored-by: Ibraheem Zaman <1zaman@users.noreply.github.com> Co-authored-by: Thomas Vos <thomasjsvos@gmail.com> Co-authored-by: Travis Wyatt <travis.i.wyatt@gmail.com>
* Introduce SharedFlow and sharing operators Summary of changes: * SharedFlow, MutableSharedFlow and its constructor. * StateFlow implements SharedFlow. * SharedFlow.onSubscription operator, clarified docs in other onXxx operators. * BufferOverflow strategy in kotlinx.coroutines.channels package. * shareIn and stateIn operators and SharingStarted strategies for them. * SharedFlow.flowOn error lint (up from StateFlow). * Precise cancellable() operator fusion. * Precise distinctUntilChanged() operator fusion. * StateFlow.compareAndSet function. * asStateFlow and asSharedFlow read-only view functions. * Consistently clarified docs on cold vs hot flows. * Future deprecation notice for BroadcastChannel, ConflatedBroadcastChannel, broadcast, and broadcastIn. * Channel(...) constructor function has onBufferOverflow parameter. * buffer(...) operator has onBufferOverflow parameter. * shareIn/stateIn buffer and overflow strategy are configured via upstream buffer operators. * shareIn/stateIn fuse with upstream flowOn for more efficient execution. * conflate() is implemented as buffer(onBufferOverflow=KEEP_LATEST), non-suspending strategies are reasonably supported with 0 and default capacities. * Added reactive operator migration hints. * WhileSubscribed with kotlin.time.Duration params Fixes Kotlin#2034 Fixes Kotlin#2047 Co-authored-by: Ibraheem Zaman <1zaman@users.noreply.github.com> Co-authored-by: Thomas Vos <thomasjsvos@gmail.com> Co-authored-by: Travis Wyatt <travis.i.wyatt@gmail.com>
Summary of changes:
Fixes #2034
Fixes #2047