Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zip list of reactive streams #222

Open
wants to merge 4 commits into
base: 2.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/main/kotlin/io/reactivex/rxkotlin/Flowables.kt
Original file line number Diff line number Diff line change
Expand Up @@ -367,3 +367,17 @@ inline fun <T : Any, U : Any, R : Any> Flowable<T>.zipWith(
@SchedulerSupport(SchedulerSupport.NONE)
fun <T : Any, U : Any> Flowable<T>.zipWith(other: Publisher<U>): Flowable<Pair<T, U>> =
zipWith(other, BiFunction { t, u -> Pair(t, u) })

/**
* Converts a list of flowables to a flowable list
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
fun <T: Any> List<Flowable<T>>.zipFlowables(): Flowable<List<T>> {
if (isEmpty()) return Flowable.just(emptyList())

return Flowable.zip(this) {
@Suppress("UNCHECKED_CAST")
return@zip (it as Array<T>).toList()
}
}
12 changes: 12 additions & 0 deletions src/main/kotlin/io/reactivex/rxkotlin/Maybes.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package io.reactivex.rxkotlin

import io.reactivex.Maybe
import io.reactivex.MaybeSource
import io.reactivex.Single
import io.reactivex.annotations.CheckReturnValue
import io.reactivex.annotations.SchedulerSupport
import io.reactivex.functions.*
Expand Down Expand Up @@ -130,3 +131,14 @@ inline fun <T : Any, U : Any, R : Any> Maybe<T>.zipWith(
@SchedulerSupport(SchedulerSupport.NONE)
fun <T : Any, U : Any> Maybe<T>.zipWith(other: MaybeSource<U>): Maybe<Pair<T, U>> =
zipWith(other, BiFunction { t, u -> Pair(t, u) })

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
fun <T: Any> List<Maybe<T>>.zipMaybes(): Maybe<List<T>> {
if (isEmpty()) return Maybe.just(emptyList())

return Maybe.zip(this) {
@Suppress("UNCHECKED_CAST")
return@zip (it as Array<T>).toList()
}
}
14 changes: 14 additions & 0 deletions src/main/kotlin/io/reactivex/rxkotlin/Observables.kt
Original file line number Diff line number Diff line change
Expand Up @@ -301,3 +301,17 @@ inline fun <T : Any, U : Any, R : Any> Observable<T>.zipWith(
@SchedulerSupport(SchedulerSupport.NONE)
fun <T : Any, U : Any> Observable<T>.zipWith(other: ObservableSource<U>): Observable<Pair<T, U>> =
zipWith(other, BiFunction { t, u -> Pair(t, u) })

/**
* Converts a list of observables to an observable list
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
fun <T: Any> List<Observable<T>>.zipObservables(): Observable<List<T>> {
if (isEmpty()) return Observable.just(emptyList())

return Observable.zip(this) {
@Suppress("UNCHECKED_CAST")
return@zip (it as Array<T>).toList()
}
}
11 changes: 11 additions & 0 deletions src/main/kotlin/io/reactivex/rxkotlin/Singles.kt
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,14 @@ inline fun <T : Any, U : Any, R : Any> Single<T>.zipWith(
@SchedulerSupport(SchedulerSupport.NONE)
fun <T : Any, U : Any> Single<T>.zipWith(other: SingleSource<U>): Single<Pair<T, U>> =
zipWith(other, BiFunction { t, u -> Pair(t, u) })

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
fun <T: Any> List<Single<T>>.zipSingles(): Single<List<T>> {
if (isEmpty()) return Single.just(emptyList())

return Single.zip(this) {
@Suppress("UNCHECKED_CAST")
return@zip (it as Array<T>).toList()
}
}
30 changes: 30 additions & 0 deletions src/test/kotlin/io/reactivex/rxkotlin/FlowablesTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.reactivex.rxkotlin

import io.reactivex.Flowable
import org.junit.Test

class FlowablesTest : KotlinTests() {

@Test fun zipFlowablesWithEmptyListReturnsEmptyList() {
val flowables = emptyList<Flowable<Int>>()

val zippedFlowables = flowables.zipFlowables().blockingFirst()

assert(zippedFlowables.isEmpty())
}

@Test fun zipObservablesWithNonEmptyListReturnsNonEmptyListWithCorrectElements() {
val flowables = listOf(
Flowable.just(1),
Flowable.just(2),
Flowable.just(3)
)

val zippedFlowables = flowables.zipFlowables().blockingFirst()

assert(zippedFlowables.size == 3)
assert(zippedFlowables[0] == 1)
assert(zippedFlowables[1] == 2)
assert(zippedFlowables[2] == 3)
}
}
32 changes: 32 additions & 0 deletions src/test/kotlin/io/reactivex/rxkotlin/MaybesTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.reactivex.rxkotlin

import io.reactivex.Maybe
import org.junit.Test

class MaybesTest : KotlinTests() {

@Test
fun zipMaybesWithEmptyListReturnsEmptyList() {
val maybes = emptyList<Maybe<Int>>()

val zippedMaybes = maybes.zipMaybes().blockingGet()

assert(zippedMaybes.isEmpty())
}

@Test
fun zipMaybesWithNonEmptyListReturnsNonEmptyListWithCorrectElements() {
val maybes = listOf(
Maybe.just(1),
Maybe.just(2),
Maybe.just(3)
)

val zippedMaybes = maybes.zipMaybes().blockingGet()

assert(zippedMaybes.size == 3)
assert(zippedMaybes[0] == 1)
assert(zippedMaybes[1] == 2)
assert(zippedMaybes[2] == 3)
}
}
23 changes: 23 additions & 0 deletions src/test/kotlin/io/reactivex/rxkotlin/ObservablesTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,27 @@ class ObservablesTest {

assertEquals(triple, result)
}

@Test fun zipObservablesWithEmptyListReturnsEmptyList() {
val observables = emptyList<Observable<Int>>()

val zippedObservables = observables.zipObservables().blockingFirst()

assert(zippedObservables.isEmpty())
}

@Test fun zipObservablesWithNonEmptyListReturnsNonEmptyListWithCorrectElements() {
val observables = listOf(
Observable.just(1),
Observable.just(2),
Observable.just(3)
)

val zippedObservables = observables.zipObservables().blockingFirst()

assert(zippedObservables.size == 3)
assert(zippedObservables[0] == 1)
assert(zippedObservables[1] == 2)
assert(zippedObservables[2] == 3)
}
}
23 changes: 23 additions & 0 deletions src/test/kotlin/io/reactivex/rxkotlin/SinglesTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,29 @@ class SinglesTest : KotlinTests() {
assert(nine == 9, { -> "Should equal nine"})
}).blockingGet()
}

@Test fun zipSinglesWithEmptyListReturnsEmptyList() {
val singles = emptyList<Single<Int>>()

val zippedSingles = singles.zipSingles().blockingGet()

assert(zippedSingles.isEmpty())
}

@Test fun zipSinglesWithNonEmptyListReturnsNonEmptyListWithCorrectElements() {
val singles = listOf(
Single.just(1),
Single.just(2),
Single.just(3)
)

val zippedSingles = singles.zipSingles().blockingGet()

assert(zippedSingles.size == 3)
assert(zippedSingles[0] == 1)
assert(zippedSingles[1] == 2)
assert(zippedSingles[2] == 3)
}
}

fun SingleSourceInt(i: Int): SingleSource<Int> {
Expand Down