Skip to content

Commit

Permalink
2.x: add offer() method to Publish & Behavior Processors (#5184)
Browse files Browse the repository at this point in the history
* 2.x: add offer() method to Publish & Behavior Processors

* Sleep instead of yield.
  • Loading branch information
akarnokd authored Mar 16, 2017
1 parent db75e89 commit 7673c09
Show file tree
Hide file tree
Showing 11 changed files with 602 additions and 2 deletions.
41 changes: 40 additions & 1 deletion src/main/java/io/reactivex/processors/BehaviorProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@

package io.reactivex.processors;

import io.reactivex.annotations.CheckReturnValue;
import java.lang.reflect.Array;
import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.*;

import org.reactivestreams.*;

import io.reactivex.annotations.*;
import io.reactivex.exceptions.MissingBackpressureException;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
Expand Down Expand Up @@ -217,6 +217,41 @@ public void onComplete() {
}
}

/**
* Tries to emit the item to all currently subscribed Subscribers if all of them
* has requested some value, returns false otherwise.
* <p>
* This method should be called in a sequential manner just like the onXXX methods
* of the PublishProcessor.
* <p>
* Calling with null will terminate the PublishProcessor and a NullPointerException
* is signalled to the Subscribers.
* @param t the item to emit, not null
* @return true if the item was emitted to all Subscribers
* @since 2.0.8 - experimental
*/
@Experimental
public boolean offer(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return true;
}
BehaviorSubscription<T>[] array = subscribers.get();

for (BehaviorSubscription<T> s : array) {
if (s.isFull()) {
return false;
}
}

Object o = NotificationLite.next(t);
setCurrent(o);
for (BehaviorSubscription<T> bs : array) {
bs.emitNext(o, index);
}
return true;
}

@Override
public boolean hasSubscribers() {
return subscribers.get().length != 0;
Expand Down Expand Up @@ -538,5 +573,9 @@ void emitLoop() {
q.forEachWhile(this);
}
}

public boolean isFull() {
return get() == 0L;
}
}
}
39 changes: 38 additions & 1 deletion src/main/java/io/reactivex/processors/PublishProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
*/
package io.reactivex.processors;

import io.reactivex.annotations.CheckReturnValue;
import java.util.concurrent.atomic.*;

import org.reactivestreams.*;

import io.reactivex.annotations.*;
import io.reactivex.exceptions.MissingBackpressureException;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.internal.util.BackpressureHelper;
Expand Down Expand Up @@ -227,6 +227,39 @@ public void onComplete() {
}
}

/**
* Tries to emit the item to all currently subscribed Subscribers if all of them
* has requested some value, returns false otherwise.
* <p>
* This method should be called in a sequential manner just like the onXXX methods
* of the PublishProcessor.
* <p>
* Calling with null will terminate the PublishProcessor and a NullPointerException
* is signalled to the Subscribers.
* @param t the item to emit, not null
* @return true if the item was emitted to all Subscribers
* @since 2.0.8 - experimental
*/
@Experimental
public boolean offer(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return true;
}
PublishSubscription<T>[] array = subscribers.get();

for (PublishSubscription<T> s : array) {
if (s.isFull()) {
return false;
}
}

for (PublishSubscription<T> s : array) {
s.onNext(t);
}
return true;
}

@Override
public boolean hasSubscribers() {
return subscribers.get().length != 0;
Expand Down Expand Up @@ -321,5 +354,9 @@ public void cancel() {
public boolean isCancelled() {
return get() == Long.MIN_VALUE;
}

boolean isFull() {
return get() == 0L;
}
}
}
57 changes: 57 additions & 0 deletions src/test/java/io/reactivex/processors/BehaviorProcessorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -699,4 +699,61 @@ public void firstBackpressured() {

assertFalse(p.hasSubscribers());
}

@Test
public void offer() {
BehaviorProcessor<Integer> pp = BehaviorProcessor.create();

TestSubscriber<Integer> ts = pp.test(0);

assertFalse(pp.offer(1));

ts.request(1);

assertTrue(pp.offer(1));

assertFalse(pp.offer(2));

ts.cancel();

assertTrue(pp.offer(2));

ts = pp.test(1);

assertTrue(pp.offer(null));

ts.assertFailure(NullPointerException.class, 2);

assertTrue(pp.hasThrowable());
assertTrue(pp.getThrowable().toString(), pp.getThrowable() instanceof NullPointerException);
}

@Test
public void offerAsync() throws Exception {
final BehaviorProcessor<Integer> pp = BehaviorProcessor.create();

Schedulers.single().scheduleDirect(new Runnable() {
@Override
public void run() {
while (!pp.hasSubscribers()) {
try {
Thread.sleep(1);
} catch (InterruptedException ex) {
return;
}
}

for (int i = 1; i <= 10; i++) {
while (!pp.offer(i)) { }
}
pp.onComplete();
}
});

Thread.sleep(1);

pp.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}
}
55 changes: 55 additions & 0 deletions src/test/java/io/reactivex/processors/PublishProcessorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -621,5 +621,60 @@ public void run() {
}
}

@Test
public void offer() {
PublishProcessor<Integer> pp = PublishProcessor.create();

TestSubscriber<Integer> ts = pp.test(0);

assertFalse(pp.offer(1));

ts.request(1);

assertTrue(pp.offer(1));

assertFalse(pp.offer(2));

ts.cancel();

assertTrue(pp.offer(2));

ts = pp.test(0);

assertTrue(pp.offer(null));

ts.assertFailure(NullPointerException.class);

assertTrue(pp.hasThrowable());
assertTrue(pp.getThrowable().toString(), pp.getThrowable() instanceof NullPointerException);
}

@Test
public void offerAsync() throws Exception {
final PublishProcessor<Integer> pp = PublishProcessor.create();

Schedulers.single().scheduleDirect(new Runnable() {
@Override
public void run() {
while (!pp.hasSubscribers()) {
try {
Thread.sleep(1);
} catch (InterruptedException ex) {
return;
}
}

for (int i = 1; i <= 10; i++) {
while (!pp.offer(i)) { }
}
pp.onComplete();
}
});

Thread.sleep(1);

pp.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.tck;

import org.reactivestreams.Publisher;
import org.testng.annotations.Test;

import io.reactivex.processors.AsyncProcessor;
import io.reactivex.schedulers.Schedulers;

@Test
public class AsyncProcessorAsPublisherTckTest extends BaseTck<Integer> {

public AsyncProcessorAsPublisherTckTest() {
super(100);
}

@Override
public Publisher<Integer> createPublisher(final long elements) {
final AsyncProcessor<Integer> pp = AsyncProcessor.create();

Schedulers.io().scheduleDirect(new Runnable() {
@Override
public void run() {
long start = System.currentTimeMillis();
while (!pp.hasSubscribers()) {
try {
Thread.sleep(1);
} catch (InterruptedException ex) {
return;
}

if (System.currentTimeMillis() - start > 200) {
return;
}
}

for (int i = 0; i < elements; i++) {
pp.onNext(i);
}
pp.onComplete();
}
});
return pp;
}

@Override
public long maxElementsFromPublisher() {
return 1;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.tck;

import org.reactivestreams.Publisher;
import org.testng.annotations.Test;

import io.reactivex.processors.BehaviorProcessor;
import io.reactivex.schedulers.Schedulers;

@Test
public class BehaviorProcessorAsPublisherTckTest extends BaseTck<Integer> {

@Override
public Publisher<Integer> createPublisher(final long elements) {
final BehaviorProcessor<Integer> pp = BehaviorProcessor.create();

Schedulers.io().scheduleDirect(new Runnable() {
@Override
public void run() {
long start = System.currentTimeMillis();
while (!pp.hasSubscribers()) {
try {
Thread.sleep(1);
} catch (InterruptedException ex) {
return;
}

if (System.currentTimeMillis() - start > 200) {
return;
}
}

for (int i = 0; i < elements; i++) {
while (!pp.offer(i)) {
Thread.yield();
if (System.currentTimeMillis() - start > 1000) {
return;
}
}
}
pp.onComplete();
}
});
return pp;
}
}
Loading

0 comments on commit 7673c09

Please sign in to comment.