diff --git a/src/main/java/io/reactivex/processors/BehaviorProcessor.java b/src/main/java/io/reactivex/processors/BehaviorProcessor.java index 3b81f6d062..4a4a69b007 100644 --- a/src/main/java/io/reactivex/processors/BehaviorProcessor.java +++ b/src/main/java/io/reactivex/processors/BehaviorProcessor.java @@ -13,13 +13,13 @@ package io.reactivex.processors; -import io.reactivex.annotations.CheckReturnValue; import java.lang.reflect.Array; import java.util.concurrent.atomic.*; import java.util.concurrent.locks.*; import org.reactivestreams.*; +import io.reactivex.annotations.*; import io.reactivex.exceptions.MissingBackpressureException; import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.subscriptions.SubscriptionHelper; @@ -217,6 +217,41 @@ public void onComplete() { } } + /** + * Tries to emit the item to all currently subscribed Subscribers if all of them + * has requested some value, returns false otherwise. + *

+ * This method should be called in a sequential manner just like the onXXX methods + * of the PublishProcessor. + *

+ * Calling with null will terminate the PublishProcessor and a NullPointerException + * is signalled to the Subscribers. + * @param t the item to emit, not null + * @return true if the item was emitted to all Subscribers + * @since 2.0.8 - experimental + */ + @Experimental + public boolean offer(T t) { + if (t == null) { + onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); + return true; + } + BehaviorSubscription[] array = subscribers.get(); + + for (BehaviorSubscription s : array) { + if (s.isFull()) { + return false; + } + } + + Object o = NotificationLite.next(t); + setCurrent(o); + for (BehaviorSubscription bs : array) { + bs.emitNext(o, index); + } + return true; + } + @Override public boolean hasSubscribers() { return subscribers.get().length != 0; @@ -538,5 +573,9 @@ void emitLoop() { q.forEachWhile(this); } } + + public boolean isFull() { + return get() == 0L; + } } } diff --git a/src/main/java/io/reactivex/processors/PublishProcessor.java b/src/main/java/io/reactivex/processors/PublishProcessor.java index 86abf96cab..554632c0e6 100644 --- a/src/main/java/io/reactivex/processors/PublishProcessor.java +++ b/src/main/java/io/reactivex/processors/PublishProcessor.java @@ -12,11 +12,11 @@ */ package io.reactivex.processors; -import io.reactivex.annotations.CheckReturnValue; import java.util.concurrent.atomic.*; import org.reactivestreams.*; +import io.reactivex.annotations.*; import io.reactivex.exceptions.MissingBackpressureException; import io.reactivex.internal.subscriptions.SubscriptionHelper; import io.reactivex.internal.util.BackpressureHelper; @@ -227,6 +227,39 @@ public void onComplete() { } } + /** + * Tries to emit the item to all currently subscribed Subscribers if all of them + * has requested some value, returns false otherwise. + *

+ * This method should be called in a sequential manner just like the onXXX methods + * of the PublishProcessor. + *

+ * Calling with null will terminate the PublishProcessor and a NullPointerException + * is signalled to the Subscribers. + * @param t the item to emit, not null + * @return true if the item was emitted to all Subscribers + * @since 2.0.8 - experimental + */ + @Experimental + public boolean offer(T t) { + if (t == null) { + onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); + return true; + } + PublishSubscription[] array = subscribers.get(); + + for (PublishSubscription s : array) { + if (s.isFull()) { + return false; + } + } + + for (PublishSubscription s : array) { + s.onNext(t); + } + return true; + } + @Override public boolean hasSubscribers() { return subscribers.get().length != 0; @@ -321,5 +354,9 @@ public void cancel() { public boolean isCancelled() { return get() == Long.MIN_VALUE; } + + boolean isFull() { + return get() == 0L; + } } } diff --git a/src/test/java/io/reactivex/processors/BehaviorProcessorTest.java b/src/test/java/io/reactivex/processors/BehaviorProcessorTest.java index f91c8f98b1..48becde4ee 100644 --- a/src/test/java/io/reactivex/processors/BehaviorProcessorTest.java +++ b/src/test/java/io/reactivex/processors/BehaviorProcessorTest.java @@ -699,4 +699,61 @@ public void firstBackpressured() { assertFalse(p.hasSubscribers()); } + + @Test + public void offer() { + BehaviorProcessor pp = BehaviorProcessor.create(); + + TestSubscriber ts = pp.test(0); + + assertFalse(pp.offer(1)); + + ts.request(1); + + assertTrue(pp.offer(1)); + + assertFalse(pp.offer(2)); + + ts.cancel(); + + assertTrue(pp.offer(2)); + + ts = pp.test(1); + + assertTrue(pp.offer(null)); + + ts.assertFailure(NullPointerException.class, 2); + + assertTrue(pp.hasThrowable()); + assertTrue(pp.getThrowable().toString(), pp.getThrowable() instanceof NullPointerException); + } + + @Test + public void offerAsync() throws Exception { + final BehaviorProcessor pp = BehaviorProcessor.create(); + + Schedulers.single().scheduleDirect(new Runnable() { + @Override + public void run() { + while (!pp.hasSubscribers()) { + try { + Thread.sleep(1); + } catch (InterruptedException ex) { + return; + } + } + + for (int i = 1; i <= 10; i++) { + while (!pp.offer(i)) { } + } + pp.onComplete(); + } + }); + + Thread.sleep(1); + + pp.test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } } diff --git a/src/test/java/io/reactivex/processors/PublishProcessorTest.java b/src/test/java/io/reactivex/processors/PublishProcessorTest.java index 82e7ed7ac8..3078250ac0 100644 --- a/src/test/java/io/reactivex/processors/PublishProcessorTest.java +++ b/src/test/java/io/reactivex/processors/PublishProcessorTest.java @@ -621,5 +621,60 @@ public void run() { } } + @Test + public void offer() { + PublishProcessor pp = PublishProcessor.create(); + + TestSubscriber ts = pp.test(0); + + assertFalse(pp.offer(1)); + + ts.request(1); + + assertTrue(pp.offer(1)); + + assertFalse(pp.offer(2)); + ts.cancel(); + + assertTrue(pp.offer(2)); + + ts = pp.test(0); + + assertTrue(pp.offer(null)); + + ts.assertFailure(NullPointerException.class); + + assertTrue(pp.hasThrowable()); + assertTrue(pp.getThrowable().toString(), pp.getThrowable() instanceof NullPointerException); + } + + @Test + public void offerAsync() throws Exception { + final PublishProcessor pp = PublishProcessor.create(); + + Schedulers.single().scheduleDirect(new Runnable() { + @Override + public void run() { + while (!pp.hasSubscribers()) { + try { + Thread.sleep(1); + } catch (InterruptedException ex) { + return; + } + } + + for (int i = 1; i <= 10; i++) { + while (!pp.offer(i)) { } + } + pp.onComplete(); + } + }); + + Thread.sleep(1); + + pp.test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } } diff --git a/src/test/java/io/reactivex/tck/AsyncProcessorAsPublisherTckTest.java b/src/test/java/io/reactivex/tck/AsyncProcessorAsPublisherTckTest.java new file mode 100644 index 0000000000..b1e3ba4405 --- /dev/null +++ b/src/test/java/io/reactivex/tck/AsyncProcessorAsPublisherTckTest.java @@ -0,0 +1,62 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.tck; + +import org.reactivestreams.Publisher; +import org.testng.annotations.Test; + +import io.reactivex.processors.AsyncProcessor; +import io.reactivex.schedulers.Schedulers; + +@Test +public class AsyncProcessorAsPublisherTckTest extends BaseTck { + + public AsyncProcessorAsPublisherTckTest() { + super(100); + } + + @Override + public Publisher createPublisher(final long elements) { + final AsyncProcessor pp = AsyncProcessor.create(); + + Schedulers.io().scheduleDirect(new Runnable() { + @Override + public void run() { + long start = System.currentTimeMillis(); + while (!pp.hasSubscribers()) { + try { + Thread.sleep(1); + } catch (InterruptedException ex) { + return; + } + + if (System.currentTimeMillis() - start > 200) { + return; + } + } + + for (int i = 0; i < elements; i++) { + pp.onNext(i); + } + pp.onComplete(); + } + }); + return pp; + } + + @Override + public long maxElementsFromPublisher() { + return 1; + } +} diff --git a/src/test/java/io/reactivex/tck/BehaviorProcessorAsPublisherTckTest.java b/src/test/java/io/reactivex/tck/BehaviorProcessorAsPublisherTckTest.java new file mode 100644 index 0000000000..cf1a47887e --- /dev/null +++ b/src/test/java/io/reactivex/tck/BehaviorProcessorAsPublisherTckTest.java @@ -0,0 +1,58 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.tck; + +import org.reactivestreams.Publisher; +import org.testng.annotations.Test; + +import io.reactivex.processors.BehaviorProcessor; +import io.reactivex.schedulers.Schedulers; + +@Test +public class BehaviorProcessorAsPublisherTckTest extends BaseTck { + + @Override + public Publisher createPublisher(final long elements) { + final BehaviorProcessor pp = BehaviorProcessor.create(); + + Schedulers.io().scheduleDirect(new Runnable() { + @Override + public void run() { + long start = System.currentTimeMillis(); + while (!pp.hasSubscribers()) { + try { + Thread.sleep(1); + } catch (InterruptedException ex) { + return; + } + + if (System.currentTimeMillis() - start > 200) { + return; + } + } + + for (int i = 0; i < elements; i++) { + while (!pp.offer(i)) { + Thread.yield(); + if (System.currentTimeMillis() - start > 1000) { + return; + } + } + } + pp.onComplete(); + } + }); + return pp; + } +} diff --git a/src/test/java/io/reactivex/tck/PublishProcessorAsPublisherTckTest.java b/src/test/java/io/reactivex/tck/PublishProcessorAsPublisherTckTest.java new file mode 100644 index 0000000000..7b82099650 --- /dev/null +++ b/src/test/java/io/reactivex/tck/PublishProcessorAsPublisherTckTest.java @@ -0,0 +1,62 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.tck; + +import org.reactivestreams.Publisher; +import org.testng.annotations.Test; + +import io.reactivex.processors.PublishProcessor; +import io.reactivex.schedulers.Schedulers; + +@Test +public class PublishProcessorAsPublisherTckTest extends BaseTck { + + public PublishProcessorAsPublisherTckTest() { + super(100); + } + + @Override + public Publisher createPublisher(final long elements) { + final PublishProcessor pp = PublishProcessor.create(); + + Schedulers.io().scheduleDirect(new Runnable() { + @Override + public void run() { + long start = System.currentTimeMillis(); + while (!pp.hasSubscribers()) { + try { + Thread.sleep(1); + } catch (InterruptedException ex) { + return; + } + + if (System.currentTimeMillis() - start > 200) { + return; + } + } + + for (int i = 0; i < elements; i++) { + while (!pp.offer(i)) { + Thread.yield(); + if (System.currentTimeMillis() - start > 1000) { + return; + } + } + } + pp.onComplete(); + } + }); + return pp; + } +} diff --git a/src/test/java/io/reactivex/tck/ReplayProcessorSizeBoundAsPublisherTckTest.java b/src/test/java/io/reactivex/tck/ReplayProcessorSizeBoundAsPublisherTckTest.java new file mode 100644 index 0000000000..9e33b9b15d --- /dev/null +++ b/src/test/java/io/reactivex/tck/ReplayProcessorSizeBoundAsPublisherTckTest.java @@ -0,0 +1,57 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.tck; + +import org.reactivestreams.Publisher; +import org.testng.annotations.Test; + +import io.reactivex.processors.ReplayProcessor; +import io.reactivex.schedulers.Schedulers; + +@Test +public class ReplayProcessorSizeBoundAsPublisherTckTest extends BaseTck { + + public ReplayProcessorSizeBoundAsPublisherTckTest() { + super(100); + } + + @Override + public Publisher createPublisher(final long elements) { + final ReplayProcessor pp = ReplayProcessor.createWithSize((int)elements + 10); + + Schedulers.io().scheduleDirect(new Runnable() { + @Override + public void run() { + long start = System.currentTimeMillis(); + while (!pp.hasSubscribers()) { + try { + Thread.sleep(1); + } catch (InterruptedException ex) { + return; + } + + if (System.currentTimeMillis() - start > 200) { + return; + } + } + + for (int i = 0; i < elements; i++) { + pp.onNext(i); + } + pp.onComplete(); + } + }); + return pp; + } +} diff --git a/src/test/java/io/reactivex/tck/ReplayProcessorTimeBoundAsPublisherTckTest.java b/src/test/java/io/reactivex/tck/ReplayProcessorTimeBoundAsPublisherTckTest.java new file mode 100644 index 0000000000..6edda768e6 --- /dev/null +++ b/src/test/java/io/reactivex/tck/ReplayProcessorTimeBoundAsPublisherTckTest.java @@ -0,0 +1,59 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.tck; + +import java.util.concurrent.TimeUnit; + +import org.reactivestreams.Publisher; +import org.testng.annotations.Test; + +import io.reactivex.processors.ReplayProcessor; +import io.reactivex.schedulers.Schedulers; + +@Test +public class ReplayProcessorTimeBoundAsPublisherTckTest extends BaseTck { + + public ReplayProcessorTimeBoundAsPublisherTckTest() { + super(100); + } + + @Override + public Publisher createPublisher(final long elements) { + final ReplayProcessor pp = ReplayProcessor.createWithTime(1, TimeUnit.MINUTES, Schedulers.computation()); + + Schedulers.io().scheduleDirect(new Runnable() { + @Override + public void run() { + long start = System.currentTimeMillis(); + while (!pp.hasSubscribers()) { + try { + Thread.sleep(1); + } catch (InterruptedException ex) { + return; + } + + if (System.currentTimeMillis() - start > 200) { + return; + } + } + + for (int i = 0; i < elements; i++) { + pp.onNext(i); + } + pp.onComplete(); + } + }); + return pp; + } +} diff --git a/src/test/java/io/reactivex/tck/ReplayProcessorUnboundedAsPublisherTckTest.java b/src/test/java/io/reactivex/tck/ReplayProcessorUnboundedAsPublisherTckTest.java new file mode 100644 index 0000000000..10796599aa --- /dev/null +++ b/src/test/java/io/reactivex/tck/ReplayProcessorUnboundedAsPublisherTckTest.java @@ -0,0 +1,57 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.tck; + +import org.reactivestreams.Publisher; +import org.testng.annotations.Test; + +import io.reactivex.processors.ReplayProcessor; +import io.reactivex.schedulers.Schedulers; + +@Test +public class ReplayProcessorUnboundedAsPublisherTckTest extends BaseTck { + + public ReplayProcessorUnboundedAsPublisherTckTest() { + super(100); + } + + @Override + public Publisher createPublisher(final long elements) { + final ReplayProcessor pp = ReplayProcessor.create(); + + Schedulers.io().scheduleDirect(new Runnable() { + @Override + public void run() { + long start = System.currentTimeMillis(); + while (!pp.hasSubscribers()) { + try { + Thread.sleep(1); + } catch (InterruptedException ex) { + return; + } + + if (System.currentTimeMillis() - start > 200) { + return; + } + } + + for (int i = 0; i < elements; i++) { + pp.onNext(i); + } + pp.onComplete(); + } + }); + return pp; + } +} diff --git a/src/test/java/io/reactivex/tck/UnicastProcessorAsPublisherTckTest.java b/src/test/java/io/reactivex/tck/UnicastProcessorAsPublisherTckTest.java new file mode 100644 index 0000000000..d64164936b --- /dev/null +++ b/src/test/java/io/reactivex/tck/UnicastProcessorAsPublisherTckTest.java @@ -0,0 +1,57 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.tck; + +import org.reactivestreams.Publisher; +import org.testng.annotations.Test; + +import io.reactivex.processors.*; +import io.reactivex.schedulers.Schedulers; + +@Test +public class UnicastProcessorAsPublisherTckTest extends BaseTck { + + public UnicastProcessorAsPublisherTckTest() { + super(100); + } + + @Override + public Publisher createPublisher(final long elements) { + final UnicastProcessor pp = UnicastProcessor.create(); + + Schedulers.io().scheduleDirect(new Runnable() { + @Override + public void run() { + long start = System.currentTimeMillis(); + while (!pp.hasSubscribers()) { + try { + Thread.sleep(1); + } catch (InterruptedException ex) { + return; + } + + if (System.currentTimeMillis() - start > 200) { + return; + } + } + + for (int i = 0; i < elements; i++) { + pp.onNext(i); + } + pp.onComplete(); + } + }); + return pp; + } +}