Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce awaitAll and joinAll extensions for collections for Deferreds/Jobs #171

Closed
elizarov opened this issue Nov 29, 2017 · 10 comments
Closed
Assignees

Comments

@elizarov
Copy link
Contributor

joinAll is simple map { it.join() }, while awaitAll needs a slightly more involved implementation than a simple map { it.await() }. It shall actually wait on all of the deferreds from collection and crash as soon as any one of them crashes.

@fvasco
Copy link
Contributor

fvasco commented Nov 29, 2017

joinAll is simple map { it.join() }

onEach should be considered to avoid collection's copy.

@luisrayas3
Copy link

luisrayas3 commented Nov 29, 2017

Related: I wrote this extension function which I use all the time:

/**
 * Performs [block] concurrently on each item in the iterable (with [async]) then awaits each deferred value.
 *
 * This function launches the coroutines for each value with the current coroutine context (i.e. the context returned
 * from [coroutineContext].
 */
suspend fun <T, R> Iterable<T>.mapAsync(block: suspend (T) -> R): List<R> =
    map { async(coroutineContext()) { block(it) } }.map { it.await() }

edit: perhaps a more appropriate name would be mapConcurrent

@elizarov
Copy link
Contributor Author

elizarov commented Nov 30, 2017

@luisrayas3 It would be also useful if concurrent/parallel map supported limited parallism. The idea is leave it named map, but add an additional parallelism parameter. This is covered in #172

@elizarov
Copy link
Contributor Author

Let me also add an idea by Brian Parma from public slack to have a something like asCompleted for a list of deferred similar to Python: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.as_completed

@fvasco
Copy link
Contributor

fvasco commented Nov 30, 2017

Some possible implementations:

fun Iterable<Job>.joinAll() : Iterable<Job> = onEach(parallelism = 1) { it.join() }

fun Iterable<Deferred<E>>.awaitAll() : Iterable<E> = flatMap(parallelism = 1)

fun ReceiveChannel<Deferred<E>>.asCompleted() : ReceiveChannel<Deferred<E>>.asCompleted() = onEach(parallelism = MAX) { it.join() }

@fogone
Copy link

fogone commented Feb 4, 2018

I made simple implementation for mapAsync method. There are some problems with contexts definition, but it works okay for my cases https://gist.github.com/fogone/a0ffa2432fb8e87f1ea007add3d402aa

@raulraja
Copy link

raulraja commented Feb 22, 2018

This is great!

It is possible though to implement this preserving arity of the different operations and the type information they may return without coallescing their types to a list. It's not pretty how it can be done https://github.com/arrow-kt/arrow/blob/master/modules/core/arrow-syntax/src/main/kotlin/arrow/syntax/applicative/applicative.kt#L256 but Arrow users can already do without casting :

import arrow.effects.*

data class Joined<A, B, C>(a: A, b: B, c: C)

val op1: DeferredK<A> = async { ... }.k()
val op2: DeferredK<B> = async { ... }.k()
val op3: DeferredK<C> = async { ... }.k()

val results: DeferredK<Joined> = 
  DeferredK.applicative().map(op1, op2, op3, { (a, b, c) ->
      Joined(a, b, c)
  })

While traversing and sequencing over List<Deferred<A>> where A is fixed is useful, I think it's more useful to preserve type information when joining paralellizable results as often times you launch N parallel ops and each one is returning different typed responses. Not sure if this is within the design space of async / await but as user I sure would like that feature.

@venkatperi
Copy link
Contributor

@elizarov
Copy link
Contributor Author

elizarov commented Apr 6, 2018

@raulraja We try to keep kotlinx.coroutines idiomatic with respect to Kotlin stdlib. We don't plan to have any multiple-arity overloads. Idiomatic way to write your code is:

data class Joined<A, B, C>(a: A, b: B, c: C)

val op1: Deferred<A> = async { ... }
val op2: Deferred<B> = async { ... }
val op3: Deferred<C> = async { ... }

val results = Joined(op1.await(), op2.await(), op3.await())

@jablko
Copy link

jablko commented May 27, 2021

Let me also add an idea by Brian Parma from public slack to have a something like asCompleted for a list of deferred similar to Python: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.as_completed

I landed here while searching for a way to get the first in a collection of deferreds to complete with a non-null value. I settled on:

deferreds.map { it::await.asFlow() }.merge().filterNotNull().firstOrNull()

Not sure if this warrants a shortcut or a new issue, e.g.

/**
 * Creates a flow from a collection of deferreds. Elements are emitted in the order they complete.
 */
fun <T> Iterable<Deferred<T>>.merge(): Flow<T> = map { it::await.asFlow() }.merge()

or

fun <T> Iterable<Deferred<T>>.asCompleted(): Flow<T> = map { it::await.asFlow() }.merge()

or

fun <T> Iterable<Deferred<T>>.race(): Flow<T> = map { it::await.asFlow() }.merge()

or if a better way already exists?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

8 participants