Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Reactive] Implement reduce() + TCK tests #1504

Merged
merged 2 commits into from
Mar 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
Expand Down Expand Up @@ -241,6 +242,40 @@ default <A, R> Single<R> collectStream(java.util.stream.Collector<T, A, R> colle
return new MultiCollectorPublisher<>(this, collector);
}

/**
* Combine subsequent items via a callback function and emit
* the final value result as a Single.
* <p>
* If the upstream is empty, the resulting Single is also empty.
* If the upstream contains only one item, the reducer function
* is not invoked and the resulting Single will have only that
* single item.
* </p>
* @param reducer the function called with the first value or the previous result,
* the current upstream value and should return a new value
* @return Single
*/
default Single<T> reduce(BiFunction<T, T, T> reducer) {
Objects.requireNonNull(reducer, "reducer is null");
return new MultiReduce<>(this, reducer);
}

/**
* Combine every upstream item with an accumulator value to produce a new accumulator
* value and emit the final accumulator value as a Single.
* @param supplier the function to return the initial accumulator value for each incoming
* Subscriber
* @param reducer the function that receives the current accumulator value, the current
* upstream value and should return a new accumulator value
* @param <R> the accumulator and result type
* @return Single
*/
default <R> Single<R> reduce(Supplier<? extends R> supplier, BiFunction<R, T, R> reducer) {
Objects.requireNonNull(supplier, "supplier is null");
Objects.requireNonNull(reducer, "reducer is null");
return new MultiReduceFull<>(this, supplier, reducer);
}

/**
* Get the first item of this {@link Multi} instance as a {@link Single}.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package io.helidon.common.reactive;

import java.util.Objects;
import java.util.concurrent.Flow;
import java.util.function.BiFunction;

/**
* Combine subsequent items via a callback function and emit
* the result as a Single.
* @param <T> the element type of the source and result
*/
final class MultiReduce<T> implements Single<T> {

private final Multi<T> source;

private final BiFunction<T, T, T> reducer;

MultiReduce(Multi<T> source, BiFunction<T, T, T> reducer) {
this.source = source;
this.reducer = reducer;
}

@Override
public void subscribe(Flow.Subscriber<? super T> subscriber) {
source.subscribe(new ReducerSubscriber<>(subscriber, reducer));
}

static final class ReducerSubscriber<T> extends DeferredScalarSubscription<T>
implements Flow.Subscriber<T> {

private final BiFunction<T, T, T> reducer;

private Flow.Subscription upstream;

private T current;

ReducerSubscriber(Flow.Subscriber<? super T> downstream, BiFunction<T, T, T> reducer) {
super(downstream);
this.reducer = reducer;
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
SubscriptionHelper.validate(upstream, subscription);
upstream = subscription;
// FIXME subscribeSelf()
downstream().onSubscribe(this);
subscription.request(Long.MAX_VALUE);
}

@Override
public void onNext(T item) {
Flow.Subscription s = upstream;
if (s != SubscriptionHelper.CANCELED) {
T current = this.current;
if (current == null) {
this.current = item;
} else {
try {
this.current = Objects.requireNonNull(reducer.apply(current, item),
"The reducer returned a null item");
} catch (Throwable ex) {
s.cancel();
onError(ex);
}
}
}
}

@Override
public void onError(Throwable throwable) {
if (upstream != SubscriptionHelper.CANCELED) {
upstream = SubscriptionHelper.CANCELED;
current = null;
// FIXME error()
downstream().onError(throwable);
}
}

@Override
public void onComplete() {
if (upstream != SubscriptionHelper.CANCELED) {
upstream = SubscriptionHelper.CANCELED;
T current = this.current;
this.current = null;
if (current == null) {
// FIXME complete()
downstream().onComplete();
} else {
complete(current);
}
}
}

@Override
public void cancel() {
super.cancel();
upstream.cancel();
upstream = SubscriptionHelper.CANCELED;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package io.helidon.common.reactive;

import java.util.Objects;
import java.util.concurrent.Flow;
import java.util.function.BiFunction;
import java.util.function.Supplier;

/**
* Combine items via an accumulator and function into a single value.
* @param <T> the source value type
* @param <R> the accumulator and result type
*/
final class MultiReduceFull<T, R> implements Single<R> {

private final Multi<T> source;

private final Supplier<? extends R> supplier;

private final BiFunction<R, T, R> reducer;

MultiReduceFull(Multi<T> source, Supplier<? extends R> supplier, BiFunction<R, T, R> reducer) {
this.source = source;
this.supplier = supplier;
this.reducer = reducer;
}

@Override
public void subscribe(Flow.Subscriber<? super R> subscriber) {
R initial;
try {
initial = Objects.requireNonNull(supplier.get(),
"The supplier returned a null item");
} catch (Throwable ex) {
subscriber.onSubscribe(EmptySubscription.INSTANCE);
subscriber.onError(ex);
return;
}
source.subscribe(new ReduceFullSubscriber<>(subscriber, initial, reducer));
}

static final class ReduceFullSubscriber<T, R> extends DeferredScalarSubscription<R>
implements Flow.Subscriber<T> {

private final BiFunction<R, T, R> reducer;

private R accumulator;

private Flow.Subscription upstream;

ReduceFullSubscriber(Flow.Subscriber<? super R> downstream, R initial, BiFunction<R, T, R> reducer) {
super(downstream);
this.reducer = reducer;
this.accumulator = initial;
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
SubscriptionHelper.validate(upstream, subscription);
upstream = subscription;
// FIXME subscribeSelf()
downstream().onSubscribe(this);
subscription.request(Long.MAX_VALUE);
}

@Override
public void onNext(T item) {
Flow.Subscription s = upstream;
if (s != SubscriptionHelper.CANCELED) {
try {
accumulator = Objects.requireNonNull(reducer.apply(accumulator, item),
"The reducer returned a null item");
} catch (Throwable ex) {
s.cancel();
onError(ex);
}
}
}

@Override
public void onError(Throwable throwable) {
if (upstream != SubscriptionHelper.CANCELED) {
upstream = SubscriptionHelper.CANCELED;
accumulator = null;
// FIXME error()
downstream().onError(throwable);
}
}

@Override
public void onComplete() {
if (upstream != SubscriptionHelper.CANCELED) {
upstream = SubscriptionHelper.CANCELED;
R accumulator = this.accumulator;
this.accumulator = null;
complete(accumulator);
}
}

@Override
public void cancel() {
super.cancel();
upstream.cancel();
upstream = SubscriptionHelper.CANCELED;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.helidon.common.reactive;

import org.reactivestreams.tck.TestEnvironment;
import org.reactivestreams.tck.flow.FlowPublisherVerification;
import org.testng.annotations.Test;

import java.util.concurrent.Flow;

@Test
public class MultiReduceFullTckTest extends FlowPublisherVerification<Integer> {

public MultiReduceFullTckTest() {
super(new TestEnvironment(50));
}

@Override
public Flow.Publisher<Integer> createFlowPublisher(long l) {
return Multi.range(1, 10).reduce(() -> 0, Integer::sum);
}

@Override
public Flow.Publisher<Integer> createFailedFlowPublisher() {
return null;
}

@Override
public long maxElementsFromPublisher() {
return 1;
}
}
Loading