Skip to content

Commit

Permalink
Multi ifEmpty (#3470)
Browse files Browse the repository at this point in the history
* Multi.ifEmpty

Signed-off-by: Daniel Kec <daniel.kec@oracle.com>
  • Loading branch information
danielkec authored Oct 6, 2021
1 parent 94da9ea commit c9d9734
Show file tree
Hide file tree
Showing 11 changed files with 449 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,17 @@ default Multi<T> onTerminate(Runnable onTerminate) {
onTerminate);
}

/**
* Executes given {@link java.lang.Runnable} when stream is finished without value(empty stream).
*
* @param ifEmpty {@link java.lang.Runnable} to be executed.
* @return Multi
*/
default Multi<T> ifEmpty(Runnable ifEmpty) {
Objects.requireNonNull(ifEmpty, "ifEmpty callback is null");
return new MultiIfEmptyPublisher<>(this, ifEmpty);
}

/**
* Invoke provided consumer for every item in stream.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright (c) 2021 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.helidon.common.reactive;

import java.util.Objects;
import java.util.concurrent.Flow;

/**
* Executes given {@link java.lang.Runnable} when stream is finished without value(empty stream).
*
* @param <T> the item type
*/
final class MultiIfEmptyPublisher<T> implements Multi<T> {

private final Multi<T> source;
private final Runnable ifEmpty;

MultiIfEmptyPublisher(Multi<T> source, Runnable ifEmpty) {
this.source = source;
this.ifEmpty = ifEmpty;
}

@Override
public void subscribe(Flow.Subscriber<? super T> subscriber) {
Objects.requireNonNull(subscriber, "subscriber is null");
source.subscribe(new IfEmptySubscriber<>(subscriber, ifEmpty));
}

static final class IfEmptySubscriber<T> implements Flow.Subscriber<T>, Flow.Subscription {

private final Flow.Subscriber<? super T> downstream;
private final Runnable ifEmpty;

private boolean empty;

private Flow.Subscription upstream;

IfEmptySubscriber(Flow.Subscriber<? super T> downstream, Runnable ifEmpty) {
this.downstream = downstream;
this.ifEmpty = ifEmpty;
this.empty = true;
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
SubscriptionHelper.validate(upstream, subscription);
upstream = subscription;
downstream.onSubscribe(this);
}

@Override
public void onNext(T item) {
Flow.Subscription s = upstream;
if (s != SubscriptionHelper.CANCELED) {
empty = false;
downstream.onNext(item);
}
}

@Override
public void onError(Throwable throwable) {
Flow.Subscription s = upstream;
if (s != SubscriptionHelper.CANCELED) {
upstream = SubscriptionHelper.CANCELED;
downstream.onError(throwable);
}
}

@Override
public void onComplete() {
Flow.Subscription s = upstream;
if (s != SubscriptionHelper.CANCELED) {
upstream = SubscriptionHelper.CANCELED;
boolean e = empty;
if (e) {
try {
ifEmpty.run();
} catch (Throwable t) {
downstream.onError(t);
return;
}
}
downstream.onComplete();
}
}

@Override
public void request(long n) {
upstream.request(n);
}

@Override
public void cancel() {
upstream.cancel();
upstream = SubscriptionHelper.CANCELED;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,17 @@ default Single<T> onTerminate(Runnable onTerminate) {
onTerminate);
}

/**
* Executes given {@link java.lang.Runnable} when stream is finished without value(empty stream).
*
* @param ifEmpty {@link java.lang.Runnable} to be executed.
* @return Multi
*/
default Single<T> ifEmpty(Runnable ifEmpty) {
Objects.requireNonNull(ifEmpty, "ifEmpty callback is null");
return new SingleIfEmptyPublisher<>(this, ifEmpty);
}

/**
* Invoke provided consumer for the item in stream.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2021 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.helidon.common.reactive;

import java.util.Objects;
import java.util.concurrent.Flow;

/**
* Executes given {@link java.lang.Runnable} when stream is finished without value(empty stream).
*
* @param <T> the item type
*/
final class SingleIfEmptyPublisher<T> extends CompletionSingle<T> {

private final Single<T> source;
private final Runnable ifEmpty;

SingleIfEmptyPublisher(Single<T> source, Runnable ifEmpty) {
this.source = source;
this.ifEmpty = ifEmpty;
}

@Override
public void subscribe(Flow.Subscriber<? super T> subscriber) {
Objects.requireNonNull(subscriber, "subscriber is null");
source.subscribe(new MultiIfEmptyPublisher.IfEmptySubscriber<>(subscriber, ifEmpty));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright (c) 2021 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package io.helidon.common.reactive;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;

import org.junit.jupiter.api.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;

public class MultiIfEmptyTest {

@Test
void empty() {
List<String> result = new ArrayList<>();
Multi.<String>empty()
.ifEmpty(() -> result.add("ifEmpty"))
.onComplete(() -> result.add("onComplete"))
.peek(result::add)
.onError(t -> result.add("onError"))
.ignoreElements();
assertThat(result, contains("ifEmpty", "onComplete"));
}

@Test
void multipleEmpty() {
List<String> result = new ArrayList<>();
Single.just(Optional.<String>empty())
.flatMapOptional(Function.identity())
.ifEmpty(() -> result.add("ifEmptyOptional"))
.flatMap(s -> Single.<String>empty())
.ifEmpty(() -> result.add("ifEmpty"))
.onComplete(() -> result.add("onComplete"))
.peek(result::add)
.onError(t -> result.add("onError"))
.ignoreElements();
assertThat(result, contains("ifEmptyOptional", "ifEmpty", "onComplete"));
}

@Test
void nonEmpty() {
List<String> result = new ArrayList<>();
Multi.just(1, 2, 3)
.map(String::valueOf)
.peek(result::add)
.ifEmpty(() -> result.add("ifEmpty"))
.onComplete(() -> result.add("onComplete"))
.onError(t -> result.add("onError"))
.ignoreElements();
assertThat(result, contains("1", "2", "3", "onComplete"));
}

@Test
void error() {
List<String> result = new ArrayList<>();
Multi.just(1, 2, 3)
.flatMap(i -> {
if (i == 3) {
return Single.error(new Exception("BOOM!"));
} else {
return Single.just(String.valueOf(i));
}
})
.peek(result::add)
.ifEmpty(() -> result.add("ifEmpty"))
.onComplete(() -> result.add("onComplete"))
.onError(t -> result.add("onError"))
.ignoreElements();
assertThat(result, contains("1", "2", "onError"));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright (c) 2021 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package io.helidon.common.reactive;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;

import org.junit.jupiter.api.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;

public class SingleIfEmptyTest {

@Test
void empty() {
List<String> result = new ArrayList<>();
Single.<String>empty()
.ifEmpty(() -> result.add("ifEmpty"))
.onComplete(() -> result.add("onComplete"))
.peek(result::add)
.onError(t -> result.add("onError"))
.ignoreElement();
assertThat(result, contains("ifEmpty", "onComplete"));
}

@Test
void multipleEmpty() {
List<String> result = new ArrayList<>();
Single.just(Optional.<String>empty())
.flatMapOptional(Function.identity())
.ifEmpty(() -> result.add("ifEmptyOptional"))
.flatMapSingle(s -> Single.<String>empty())
.ifEmpty(() -> result.add("ifEmpty"))
.onComplete(() -> result.add("onComplete"))
.peek(result::add)
.onError(t -> result.add("onError"))
.ignoreElement();
assertThat(result, contains("ifEmptyOptional", "ifEmpty", "onComplete"));
}

@Test
void nonEmpty() {
List<String> result = new ArrayList<>();
Single.just(1)
.map(String::valueOf)
.peek(result::add)
.ifEmpty(() -> result.add("ifEmpty"))
.onComplete(() -> result.add("onComplete"))
.onError(t -> result.add("onError"))
.ignoreElement();
assertThat(result, contains("1", "onComplete"));
}

@Test
void error() {
List<String> result = new ArrayList<>();
Single.<String>error(new Exception("BOOM!"))
.peek(result::add)
.ifEmpty(() -> result.add("ifEmpty"))
.onComplete(() -> result.add("onComplete"))
.onError(t -> result.add("onError"))
.ignoreElement();
assertThat(result, contains("onError"));
}

}
1 change: 1 addition & 0 deletions docs/shared/reactivestreams/02_engine.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ Single.just("1")
|never|Get a `Multi` instance that never completes.
|concat|Concat streams to one.
|onTerminate|Executes given `java.lang.Runnable` when any of signals onComplete, onCancel or onError is received.
|ifEmpty|Executes given `java.lang.Runnable` when stream is finished without value(empty stream).
|onComplete|Executes given `java.lang.Runnable` when onComplete signal is received.
|onError|Executes the given java.util.function.Consumer when an onError signal is received.
|onCancel|Executes given `java.lang.Runnable` when a cancel signal is received.
Expand Down
1 change: 1 addition & 0 deletions docs/shared/reactivestreams/03_rsoperators.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ ReactiveStreams.of("1", "2", "3", "4", "5")
|onErrorResumeWithRsPublisher| When onError signal received continue emitting from supplied publisher
|onComplete| Invoke supplied runnable when onComplete signal received
|onTerminate| Invoke supplied runnable when onComplete or onError signal received
|ifEmpty| Executes given `java.lang.Runnable` when stream is finished without value(empty stream).
|to| Connect this stream to supplied subscriber
|toList| Collect all intercepted items to List
|collect| Collect all intercepted items with provided collector
Expand Down
Loading

0 comments on commit c9d9734

Please sign in to comment.