Skip to content

Commit

Permalink
Decouple stream operators from Function interface. (#4711)
Browse files Browse the repository at this point in the history
This allows a single class to implement itself as an operator for all stream types. A similar change was recently made to the transformer types.
  • Loading branch information
JakeWharton authored and akarnokd committed Oct 15, 2016
1 parent 637978c commit fe4acf2
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 25 deletions.
14 changes: 8 additions & 6 deletions src/main/java/io/reactivex/CompletableOperator.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@

package io.reactivex;

import io.reactivex.functions.Function;

/**
* Convenience interface and callback used by the lift operator that given a child CompletableSubscriber,
* return a parent CompletableSubscriber that does any kind of lifecycle-related transformations.
* Interface to map/wrap a downstream observer to an upstream observer.
*/
public interface CompletableOperator extends Function<CompletableObserver, CompletableObserver> {

public interface CompletableOperator {
/**
* Applies a function to the child CompletableObserver and returns a new parent CompletableObserver.
* @param observer the child CompletableObservable instance
* @return the parent CompletableObserver instance
*/
CompletableObserver apply(CompletableObserver observer) throws Exception;
}
11 changes: 7 additions & 4 deletions src/main/java/io/reactivex/FlowableOperator.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@

import org.reactivestreams.Subscriber;

import io.reactivex.functions.Function;

/**
* Interface to map/wrap a downstream subscriber to an upstream subscriber.
*
* @param <Downstream> the value type of the downstream
* @param <Upstream> the value type of the upstream
*/
public interface FlowableOperator<Downstream, Upstream> extends Function<Subscriber<? super Downstream>, Subscriber<? super Upstream>> {

public interface FlowableOperator<Downstream, Upstream> {
/**
* Applies a function to the child Subscriber and returns a new parent Subscriber.
* @param observer the child Subscriber instance
* @return the parent Subscriber instance
*/
Subscriber<? super Upstream> apply(Subscriber<? super Downstream> observer) throws Exception;
}
13 changes: 8 additions & 5 deletions src/main/java/io/reactivex/MaybeOperator.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@

package io.reactivex;

import io.reactivex.functions.Function;

/**
* Interface to map/wrap a downstream subscriber to an upstream MaybeObserver.
* Interface to map/wrap a downstream observer to an upstream observer.
*
* @param <Downstream> the value type of the downstream
* @param <Upstream> the value type of the upstream
*/
public interface MaybeOperator<Downstream, Upstream> extends Function<MaybeObserver<? super Downstream>, MaybeObserver<? super Upstream>> {

public interface MaybeOperator<Downstream, Upstream> {
/**
* Applies a function to the child MaybeObserver and returns a new parent MaybeObserver.
* @param observer the child MaybeObserver instance
* @return the parent MaybeObserver instance
*/
MaybeObserver<? super Upstream> apply(MaybeObserver<? super Downstream> observer) throws Exception;
}
13 changes: 8 additions & 5 deletions src/main/java/io/reactivex/ObservableOperator.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@

package io.reactivex;

import io.reactivex.functions.Function;

/**
* Interface to map/wrap a downstream subscriber to an upstream Observer.
* Interface to map/wrap a downstream observer to an upstream observer.
*
* @param <Downstream> the value type of the downstream
* @param <Upstream> the value type of the upstream
*/
public interface ObservableOperator<Downstream, Upstream> extends Function<Observer<? super Downstream>, Observer<? super Upstream>> {

public interface ObservableOperator<Downstream, Upstream> {
/**
* Applies a function to the child Observer and returns a new parent Observer.
* @param observer the child Observer instance
* @return the parent Observer instance
*/
Observer<? super Upstream> apply(Observer<? super Downstream> observer) throws Exception;
}
13 changes: 8 additions & 5 deletions src/main/java/io/reactivex/SingleOperator.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@

package io.reactivex;

import io.reactivex.functions.Function;

/**
* Interface to map/wrap a downstream subscriber to an upstream SingleObserver.
* Interface to map/wrap a downstream observer to an upstream observer.
*
* @param <Downstream> the value type of the downstream
* @param <Upstream> the value type of the upstream
*/
public interface SingleOperator<Downstream, Upstream> extends Function<SingleObserver<? super Downstream>, SingleObserver<? super Upstream>> {

public interface SingleOperator<Downstream, Upstream> {
/**
* Applies a function to the child SingleObserver and returns a new parent SingleObserver.
* @param observer the child SingleObserver instance
* @return the parent SingleObserver instance
*/
SingleObserver<? super Upstream> apply(SingleObserver<? super Downstream> observer) throws Exception;
}

0 comments on commit fe4acf2

Please sign in to comment.