Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Fix buffer(open, close) not disposing indicators properly #5811

Merged
merged 3 commits into from
Jan 19, 2018

Conversation

akarnokd
Copy link
Member

The PR fixes the resource management in the buffer operator that uses other reactive sources to indicate when a buffer starts and ends. Both Flowable and Observable implementations had to be fixed.

Fixes: #5809

@simonbasle
Copy link
Contributor

@akarnokd let me know if I understand the spirit of this PR on FlowableBufferBoundary:

  1. avoids keeping reference to / tracking buffers in subscribers other than the main
  2. cancel source on open subscriber completion (if it completes and the last buffer's close publisher has completed, subscribers will have size 0 and we can safely cancel upstream as no new buffer will open at this point)
  3. in main, distinguish a open subscriber error from a close subscriber error (due to 1., the later needs associated buffer cleanup)

Is that correct/exhaustive?

@akarnokd
Copy link
Member Author

avoids keeping reference to / tracking buffers in subscribers other than the main

Yes.

cancel source on open subscriber completion (if it completes and the last buffer's close publisher has completed, subscribers will have size 0 and we can safely cancel upstream as no new buffer will open at this point

Yes.

in main, distinguish a open subscriber error from a close subscriber error (due to 1., the later needs associated buffer cleanup

No. If the openError or closeError is fired, both should cancel the main source, cancel the subscribers other than themselves, clean the buffer and then report the error. They could be converted into a common method of boundsError(Disposable d, Throwable ex).

@codecov
Copy link

codecov bot commented Jan 16, 2018

Codecov Report

Merging #5811 into 2.x will increase coverage by 0.11%.
The diff coverage is 97.35%.

Impacted file tree graph

@@             Coverage Diff              @@
##                2.x    #5811      +/-   ##
============================================
+ Coverage     96.28%   96.39%   +0.11%     
+ Complexity     5817     5814       -3     
============================================
  Files           634      634              
  Lines         41647    41761     +114     
  Branches       5776     5796      +20     
============================================
+ Hits          40098    40255     +157     
+ Misses          611      583      -28     
+ Partials        938      923      -15
Impacted Files Coverage Δ Complexity Δ
...nal/operators/flowable/FlowableBufferBoundary.java 96.05% <96.29%> (+18.85%) 2 <1> (ø) ⬇️
...operators/observable/ObservableBufferBoundary.java 98.31% <98.57%> (+20.45%) 2 <1> (ø) ⬇️
.../operators/completable/CompletableConcatArray.java 93.75% <0%> (-6.25%) 2% <0%> (ø)
...nternal/operators/parallel/ParallelReduceFull.java 91.17% <0%> (-3.93%) 2% <0%> (ø)
...l/operators/observable/ObservableFlatMapMaybe.java 84.96% <0%> (-2.62%) 2% <0%> (ø)
...ain/java/io/reactivex/subjects/PublishSubject.java 97.8% <0%> (-2.2%) 38% <0%> (-1%)
.../internal/disposables/ListCompositeDisposable.java 98% <0%> (-2%) 34% <0%> (-1%)
...ex/internal/subscribers/InnerQueuedSubscriber.java 96.07% <0%> (-1.97%) 18% <0%> (-1%)
...ternal/operators/flowable/FlowableSampleTimed.java 95.58% <0%> (-1.48%) 3% <0%> (ø)
...a/io/reactivex/internal/util/QueueDrainHelper.java 63.19% <0%> (-1.39%) 36% <0%> (-1%)
... and 20 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update aa31330...04598ce. Read the comment docs.

for (C b : bufs.values()) {
queue.offer(b);
}
bufs = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be buffers = null?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, fixed. Also combined the error methods and changed to LinkedHashMap so that the final open buffers in onComplete remain in their order they were opened.

Copy link
Contributor

@simonbasle simonbasle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, looking better with the buffers null-out and the mutualized boundaryError 👍

Copy link
Collaborator

@davidmoten davidmoten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gave quick scan only

}

emitted = e;
missed = addAndGet(-missed);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

requested not getting reduced by emitted?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No longer necessary in these type of drain loops. requested can only grow and the emitted will follow it separately. This saves an atomic decrement here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, beaut

resources.add(bcs);
void boundaryError(Disposable subscriber, Throwable ex) {
SubscriptionHelper.cancel(upstream);
subscribers.delete(subscriber);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just dispose() all subscribers here and remove dispose() call below?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same in Observable version

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It avoids disposing the caller which is known to have reached its terminal state.

a.onError(ex);
return;
} else
if (q.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: formatting, this line typically goes on the same line with else

} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
SubscriptionHelper.cancel(upstream);
if (errors.addThrowable(ex)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if/else block seems to be exact same as onError(), let's just call it here?

void boundaryError(Disposable subscriber, Throwable ex) {
SubscriptionHelper.cancel(upstream);
subscribers.delete(subscriber);
if (errors.addThrowable(ex)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if/else block seems to be exact same as onError(), let's just call it here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same in Observable version

@Test
@SuppressWarnings("unchecked")
public void boundaryOpenCloseDisposedOnComplete() {
PublishProcessor<Integer> pp0 = PublishProcessor.create();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: upstream

public void boundaryOpenCloseDisposedOnComplete() {
PublishProcessor<Integer> pp0 = PublishProcessor.create();

PublishProcessor<Integer> pp1 = PublishProcessor.create();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: openingIndicator


PublishProcessor<Integer> pp1 = PublishProcessor.create();

PublishProcessor<Integer> pp2 = PublishProcessor.create();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: closingIndicator

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and same in some tests below

Copy link
Contributor

@artem-zinnatullin artem-zinnatullin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@akarnokd akarnokd merged commit eb426fd into ReactiveX:2.x Jan 19, 2018
@akarnokd akarnokd deleted the BufferOpenCloseFix branch January 19, 2018 09:36
Copy link
Collaborator

@vanniktech vanniktech left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2.x: FlowableBufferBoundary doesn't stop opening new windows in overlapping case
5 participants