Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Fix Generics T[] in Zip & CombineLatest #4525

Merged
merged 5 commits into from
Sep 10, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 62 additions & 12 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ public static int bufferSize() {
* Combines a collection of source Publishers by emitting an item that aggregates the latest values of each of
* the source Publishers each time an item is received from any of the source Publishers, where this
* aggregation is defined by a specified function.
* <p>
* Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
* implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
* {@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.
*
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Publisher} honors backpressure from downstream. The source {@code Publisher}s
Expand All @@ -163,14 +168,19 @@ public static int bufferSize() {
*/
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
public static <T, R> Flowable<R> combineLatest(Publisher<? extends T>[] sources, Function<? super T[], ? extends R> combiner) {
public static <T, R> Flowable<R> combineLatest(Publisher<? extends T>[] sources, Function<? super Object[], ? extends R> combiner) {
Copy link
Member

@akarnokd akarnokd Sep 10, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be worth adding a sentence to the javadocs since people tend to ask why this isn't ? super T[]

<p>
Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the 
implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a 
{@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.

If you feel so, you could add these two sentences after the first sentence of all the affected methods and before the marble diagram (as people forget to scroll down below the diagram).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do

return combineLatest(sources, combiner, bufferSize());
}

/**
* Combines a collection of source Publishers by emitting an item that aggregates the latest values of each of
* the source Publishers each time an item is received from any of the source Publishers, where this
* aggregation is defined by a specified function.
* <p>
* Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
* implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
* {@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.
*
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Publisher} honors backpressure from downstream. The source {@code Publisher}s
Expand All @@ -194,14 +204,19 @@ public static <T, R> Flowable<R> combineLatest(Publisher<? extends T>[] sources,
*/
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
public static <T, R> Flowable<R> combineLatest(Function<? super T[], ? extends R> combiner, Publisher<? extends T>... sources) {
public static <T, R> Flowable<R> combineLatest(Function<? super Object[], ? extends R> combiner, Publisher<? extends T>... sources) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one does not seem necessary to be honest. It just has switched arguments with the one above. Or do I oversee anything?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and other overloads of operators let's you use varargs to specify any number of sources at the expense that varargs has to be the last argument.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ups right didn't see those little dots there too many overloads ...

return combineLatest(sources, combiner, bufferSize());
}

/**
* Combines a collection of source Publishers by emitting an item that aggregates the latest values of each of
* the source Publishers each time an item is received from any of the source Publishers, where this
* aggregation is defined by a specified function.
* <p>
* Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
* implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
* {@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.
*
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Publisher} honors backpressure from downstream. The source {@code Publisher}s
Expand All @@ -227,7 +242,7 @@ public static <T, R> Flowable<R> combineLatest(Function<? super T[], ? extends R
*/
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
public static <T, R> Flowable<R> combineLatest(Publisher<? extends T>[] sources, Function<? super T[], ? extends R> combiner, int bufferSize) {
public static <T, R> Flowable<R> combineLatest(Publisher<? extends T>[] sources, Function<? super Object[], ? extends R> combiner, int bufferSize) {
ObjectHelper.requireNonNull(sources, "sources is null");
if (sources.length == 0) {
return empty();
Expand All @@ -241,6 +256,11 @@ public static <T, R> Flowable<R> combineLatest(Publisher<? extends T>[] sources,
* Combines a collection of source Publishers by emitting an item that aggregates the latest values of each of
* the source Publishers each time an item is received from any of the source Publishers, where this
* aggregation is defined by a specified function.
* <p>
* Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
* implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
* {@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.
*
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Publisher} honors backpressure from downstream. The source {@code Publisher}s
Expand All @@ -265,14 +285,19 @@ public static <T, R> Flowable<R> combineLatest(Publisher<? extends T>[] sources,
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
public static <T, R> Flowable<R> combineLatest(Iterable<? extends Publisher<? extends T>> sources,
Function<? super T[], ? extends R> combiner) {
Function<? super Object[], ? extends R> combiner) {
return combineLatest(sources, combiner, bufferSize());
}

/**
* Combines a collection of source Publishers by emitting an item that aggregates the latest values of each of
* the source Publishers each time an item is received from any of the source Publishers, where this
* aggregation is defined by a specified function.
* <p>
* Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
* implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
* {@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.
*
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Publisher} honors backpressure from downstream. The source {@code Publisher}s
Expand All @@ -299,7 +324,7 @@ public static <T, R> Flowable<R> combineLatest(Iterable<? extends Publisher<? ex
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
public static <T, R> Flowable<R> combineLatest(Iterable<? extends Publisher<? extends T>> sources,
Function<? super T[], ? extends R> combiner, int bufferSize) {
Function<? super Object[], ? extends R> combiner, int bufferSize) {
ObjectHelper.requireNonNull(sources, "sources is null");
ObjectHelper.requireNonNull(combiner, "combiner is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
Expand All @@ -310,6 +335,11 @@ public static <T, R> Flowable<R> combineLatest(Iterable<? extends Publisher<? ex
* Combines a collection of source Publishers by emitting an item that aggregates the latest values of each of
* the source Publishers each time an item is received from any of the source Publishers, where this
* aggregation is defined by a specified function.
* <p>
* Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
* implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
* {@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.
*
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Publisher} honors backpressure from downstream. The source {@code Publisher}s
Expand All @@ -334,7 +364,7 @@ public static <T, R> Flowable<R> combineLatest(Iterable<? extends Publisher<? ex
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
public static <T, R> Flowable<R> combineLatestDelayError(Publisher<? extends T>[] sources,
Function<? super T[], ? extends R> combiner) {
Function<? super Object[], ? extends R> combiner) {
return combineLatestDelayError(sources, combiner, bufferSize());
}

Expand All @@ -343,6 +373,10 @@ public static <T, R> Flowable<R> combineLatestDelayError(Publisher<? extends T>[
* the source Publishers each time an item is received from any of the source Publishers, where this
* aggregation is defined by a specified function and delays any error from the sources until
* all source Publishers terminate.
* <p>
* Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
* implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
* {@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.
*
* <dl>
* <dt><b>Backpressure:</b></dt>
Expand All @@ -367,7 +401,7 @@ public static <T, R> Flowable<R> combineLatestDelayError(Publisher<? extends T>[
*/
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
public static <T, R> Flowable<R> combineLatestDelayError(Function<? super T[], ? extends R> combiner,
public static <T, R> Flowable<R> combineLatestDelayError(Function<? super Object[], ? extends R> combiner,
Publisher<? extends T>... sources) {
return combineLatestDelayError(sources, combiner, bufferSize());
}
Expand All @@ -377,6 +411,10 @@ public static <T, R> Flowable<R> combineLatestDelayError(Function<? super T[], ?
* the source ObservableSources each time an item is received from any of the source Publisher, where this
* aggregation is defined by a specified function and delays any error from the sources until
* all source Publishers terminate.
* <p>
* Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
* implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
* {@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.
*
* <dl>
* <dt><b>Scheduler:</b></dt>
Expand All @@ -398,7 +436,7 @@ public static <T, R> Flowable<R> combineLatestDelayError(Function<? super T[], ?
* @see <a href="http://reactivex.io/documentation/operators/combinelatest.html">ReactiveX operators documentation: CombineLatest</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
public static <T, R> Flowable<R> combineLatestDelayError(Function<? super T[], ? extends R> combiner,
public static <T, R> Flowable<R> combineLatestDelayError(Function<? super Object[], ? extends R> combiner,
int bufferSize, Publisher<? extends T>... sources) {
return combineLatestDelayError(sources, combiner, bufferSize);
}
Expand All @@ -408,6 +446,10 @@ public static <T, R> Flowable<R> combineLatestDelayError(Function<? super T[], ?
* the source Publishers each time an item is received from any of the source Publishers, where this
* aggregation is defined by a specified function and delays any error from the sources until
* all source Publishers terminate.
* <p>
* Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
* implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
* {@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.
*
* <dl>
* <dt><b>Backpressure:</b></dt>
Expand Down Expand Up @@ -435,7 +477,7 @@ public static <T, R> Flowable<R> combineLatestDelayError(Function<? super T[], ?
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
public static <T, R> Flowable<R> combineLatestDelayError(Publisher<? extends T>[] sources,
Function<? super T[], ? extends R> combiner, int bufferSize) {
Function<? super Object[], ? extends R> combiner, int bufferSize) {
ObjectHelper.requireNonNull(sources, "sources is null");
ObjectHelper.requireNonNull(combiner, "combiner is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
Expand All @@ -450,6 +492,10 @@ public static <T, R> Flowable<R> combineLatestDelayError(Publisher<? extends T>[
* the source Publishers each time an item is received from any of the source Publishers, where this
* aggregation is defined by a specified function and delays any error from the sources until
* all source Publishers terminate.
* <p>
* Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
* implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
* {@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.
*
* <dl>
* <dt><b>Backpressure:</b></dt>
Expand All @@ -475,7 +521,7 @@ public static <T, R> Flowable<R> combineLatestDelayError(Publisher<? extends T>[
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
public static <T, R> Flowable<R> combineLatestDelayError(Iterable<? extends Publisher<? extends T>> sources,
Function<? super T[], ? extends R> combiner) {
Function<? super Object[], ? extends R> combiner) {
return combineLatestDelayError(sources, combiner, bufferSize());
}

Expand All @@ -484,6 +530,10 @@ public static <T, R> Flowable<R> combineLatestDelayError(Iterable<? extends Publ
* the source Publishers each time an item is received from any of the source Publishers, where this
* aggregation is defined by a specified function and delays any error from the sources until
* all source Publishers terminate.
* <p>
* Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
* implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
* {@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.
*
* <dl>
* <dt><b>Backpressure:</b></dt>
Expand Down Expand Up @@ -511,7 +561,7 @@ public static <T, R> Flowable<R> combineLatestDelayError(Iterable<? extends Publ
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
public static <T, R> Flowable<R> combineLatestDelayError(Iterable<? extends Publisher<? extends T>> sources,
Function<? super T[], ? extends R> combiner, int bufferSize) {
Function<? super Object[], ? extends R> combiner, int bufferSize) {
ObjectHelper.requireNonNull(sources, "sources is null");
ObjectHelper.requireNonNull(combiner, "combiner is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
Expand Down Expand Up @@ -14955,4 +15005,4 @@ public final TestSubscriber<T> test(long initialRequest, boolean cancel) { // No
return ts;
}

}
}
13 changes: 12 additions & 1 deletion src/main/java/io/reactivex/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -1343,6 +1343,11 @@ public static <T> Maybe<T> wrap(MaybeSource<T> source) {
* Returns a Maybe that emits the results of a specified combiner function applied to combinations of
* items emitted, in sequence, by an Iterable of other MaybeSources.
* <p>
* Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
* implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
* {@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.
*
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/zip.png" alt="">
* <p>This operator terminates eagerly if any of the source MaybeSources signal an onError or onComplete. This
* also means it is possible some sources may not get subscribed to at all.
Expand All @@ -1362,7 +1367,7 @@ public static <T> Maybe<T> wrap(MaybeSource<T> source) {
* @see <a href="http://reactivex.io/documentation/operators/zip.html">ReactiveX operators documentation: Zip</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
public static <T, R> Maybe<R> zip(Iterable<? extends MaybeSource<? extends T>> sources, Function<? super T[], ? extends R> zipper) {
public static <T, R> Maybe<R> zip(Iterable<? extends MaybeSource<? extends T>> sources, Function<? super Object[], ? extends R> zipper) {
ObjectHelper.requireNonNull(zipper, "zipper is null");
ObjectHelper.requireNonNull(sources, "sources is null");
return RxJavaPlugins.onAssembly(new MaybeZipIterable<T, R>(sources, zipper));
Expand Down Expand Up @@ -1774,6 +1779,11 @@ public static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Maybe<R> zip(
* Returns a Maybe that emits the results of a specified combiner function applied to combinations of
* items emitted, in sequence, by an array of other MaybeSources.
* <p>
* Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
* implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
* {@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.
*
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/zip.png" alt="">
* <p>This operator terminates eagerly if any of the source MaybeSources signal an onError or onComplete. This
* also means it is possible some sources may not get subscribed to at all.
Expand All @@ -1796,6 +1806,7 @@ public static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Maybe<R> zip(
@SchedulerSupport(SchedulerSupport.NONE)
public static <T, R> Maybe<R> zipArray(Function<? super Object[], ? extends R> zipper,
MaybeSource<? extends T>... sources) {
ObjectHelper.requireNonNull(sources, "sources is null");
if (sources.length == 0) {
return empty();
}
Expand Down
Loading