Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OperatorTake (and others) use of mutating non-volatile fields #1334

Closed
davidmoten opened this issue Jun 6, 2014 · 11 comments
Closed

OperatorTake (and others) use of mutating non-volatile fields #1334

davidmoten opened this issue Jun 6, 2014 · 11 comments

Comments

@davidmoten
Copy link
Collaborator

I'd love some clarification on this issue. Given say this observable:

Observable.range(1,1000).observeOn(Schedulers.computation).take(500);

why is the count in OperatorTake not protected from access from multiple threads with the keyword volatile (or using an AtomicInteger)?

@benjchristensen
Copy link
Member

Because the contract of an Observable is that is does not receive concurrent invocations of any on* methods. Anything producing data to it must either naturally be single-threaded or serialize itself. This allows an a Observer/Subscriber to always write code that assumes single-threaded behavior.

@davidmoten
Copy link
Collaborator Author

Thanks Ben I'm happy with it not receiving concurrent invocations but is
there not still the problem of the visibility of the count variable to the
different threads? I didn't think you needed concurrency to have a
visibility problem, just multiple threads.
On 7 Jun 2014 08:53, "Ben Christensen" notifications@github.com wrote:

Because the contract of an Observable is that is does not receive
concurrent invocations of any on* methods. Anything producing data to it
must either naturally be single-threaded or serialize itself. This allows
an a Observer/Subscriber to always write code that assumes single-threaded
behavior.


Reply to this email directly or view it on GitHub
#1334 (comment).

@benjchristensen
Copy link
Member

If multiple threads are invoking onNext then it is the responsibility of the Observable to ensure it is thread-safe, and that covers visibility. This is why serialize() exists if an Observable can not handle it itself.

@davidmoten
Copy link
Collaborator Author

So in my example .serialize would be called between observe on and take?
On 7 Jun 2014 09:42, "Ben Christensen" notifications@github.com wrote:

If multiple threads are invoking onNext then it is the responsibility of
the Observable to ensure it is thread-safe, and that covers visibility.
This is why serialize() exists if an Observable can not handle it itself.


Reply to this email directly or view it on GitHub
#1334 (comment).

@benjchristensen
Copy link
Member

No, you don't need it as each of those operators conforms to the Rx contract. There is only 1 thread in your code ever invoking OperatorTake, a single thread from the Scheduler.computation event loop pool.

@benjchristensen
Copy link
Member

You should never have to use serialize() unless you come across an Observable that does not comply with the contract.

@davidmoten
Copy link
Collaborator Author

I did see that when I tested that but didn't understand how the computation
scheduler reused the same thread. I expected to see different threads
hitting the take. I'll have to look closer at how the scheduler works. That
sounds very nifty.

I can say that I have seen multiple threads hitting my custom operator
downstream of a parallel. Is this expected behaviour?
On 7 Jun 2014 09:48, "Ben Christensen" notifications@github.com wrote:

No, you don't need it as each of those operators conforms to the Rx
contract. There is only 1 thread in your code ever invoking OperatorTake,
a single thread from the Scheduler.computation event loop pool.


Reply to this email directly or view it on GitHub
#1334 (comment).

@davidmoten
Copy link
Collaborator Author

ah woops I see my misunderstanding with the Scheduler, it draws one thread from the pool of course. I should have rephrased the question to use a parallel:

Observable.range(1,1000).parallel(...).take(500);

@davidmoten davidmoten reopened this Jun 7, 2014
@benjchristensen
Copy link
Member

Yes, with parallel it is expected to have multiple threads call onNext to take in your example. It is thread-safe in 'take' due to it serializing the output via 'merge' (https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/internal/operators/OperatorParallel.java#L69) which does synchronization/volatile that ensure visibility across threads.

@davidmoten
Copy link
Collaborator Author

Great Ben, that will improve performance for my use case now that I can use
non volatile mutable fields.
On 7 Jun 2014 10:24, "Ben Christensen" notifications@github.com wrote:

Yes, with parallel it is expected to have multiple threads call onNext to
take in your example. It is thread-safe in 'take' due to it serializing the
output via 'merge' (
https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/internal/operators/OperatorParallel.java#L69)
which does synchronization/volatile that ensure visibility across threads.


Reply to this email directly or view it on GitHub
#1334 (comment).

@davidmoten
Copy link
Collaborator Author

All queries answered, thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants