Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

awaitFirst function #424

Closed
fvasco opened this issue Jul 5, 2018 · 15 comments
Closed

awaitFirst function #424

fvasco opened this issue Jul 5, 2018 · 15 comments

Comments

@fvasco
Copy link
Contributor

fvasco commented Jul 5, 2018

Core library misses of an Promise.race equivalent function.

I propose a little POC of joinFirst/awaitFirst, or a more generic selectFirst.

What do you think?

import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.selects.SelectClause0
import kotlinx.coroutines.experimental.selects.SelectClause1
import kotlinx.coroutines.experimental.selects.select
import kotlin.reflect.KProperty1

fun main(args: Array<String>) = runBlocking {
    val jobs = listOf(launch { delay(3) }, launch { delay(4) }, launch { delay(5) })
    jobs.selectFirst(Job::onJoin) {
        println("$it done")
    }

    val asyncs = listOf(async { 3 }, async { 4 }, async { 5 })
    asyncs.selectFirst(Deferred<Int>::onAwait) { deferred, value ->
        println("$deferred done with result $value")
    }

    println("First result is " + asyncs.awaitFirst())
}

// common functions

suspend fun <E : Job> Iterable<E>.joinFirst(): E = select {
    for (job in this@joinFirst) {
        job.onJoin { job }
    }
}

suspend fun <E : Deferred<R>, R> Iterable<E>.awaitFirst(): R = joinFirst().getCompleted()

// generic funtions

suspend fun <E, R> Iterable<E>.selectFirst(selectProperty: KProperty1<E, SelectClause0>, block: (E) -> R): R =
        select {
            for (task in this@selectFirst) {
                selectProperty.get(task).invoke { block(task) }
            }
        }

suspend fun <E, V, R> Iterable<E>.selectFirst(selectProperty: KProperty1<E, SelectClause1<V>>, block: (E, V) -> R): R =
        select {
            for (task in this@selectFirst) {
                selectProperty.get(task).invoke { block(task, it) }
            }
        }
@fvasco
Copy link
Contributor Author

fvasco commented Jul 5, 2018

POC combined with #410

fun main(args: Array<String>) = runBlocking {
    val result = job {
        val fetch1 = fork { longRunJob(1) }
        val fetch2 = fork { longRunJob(2) }
        listOf(fetch1, fetch2).awaitFirst()
    }

    println("The winner is $result")
}

suspend fun <R> job(parent: Job? = null, block: suspend Job.() -> R): R {
    val scope = Job(parent)
    try {
        return scope.block()
    } finally {
        scope.cancel()
    }
}

suspend fun <R> Job.fork(block: suspend () -> R): Deferred<R> = async(parent = this) { block() }

suspend fun longRunJob(i: Int): Int = i // TODO

@qwwdfsad
Copy link
Collaborator

qwwdfsad commented Jul 6, 2018

I think it lacks use-cases and is not so reliable: who will be responsible for checking the rest of jobs for exceptions or cancellations? Value of such extension in the real-world application is unclear as well.

I was never satisfied with Promise.race and CompletableFuture.anyOf rationale, especially in coroutines world (as opposed to callbacks world) where cancellation and timeouts can be managed externally and propagated automatically.

Let's see whether there is demand for such extension though

@fvasco
Copy link
Contributor Author

fvasco commented Jul 7, 2018

Hi @qwwdfsad
You are right,
I propose here another operator, I cannot found it in the core library and hopefully it will be useful.

This is more generic, so awaitFirst is trivial.
The idea is: having a collection of Jobs, I want to get these while completing.

Here a POC:

fun <E : Job> Iterable<E>.joinChannel(): ReceiveChannel<E> = JoinChannel(this)

private class JoinChannel<E : Job>(jobs: Iterable<E>) : LinkedListChannel<E>() {
    /**
     * Pending jobs count. May be negative in initialization phase
     */
    val _pendingJobCount = atomic(0)
    @Volatile
    var _disposableHandles: List<DisposableHandle>?

    init {
        val disposableHandles =
                jobs.mapNotNull { job ->
                    if (job.isCompleted) {
                        offer(job)
                        null
                    } else {
                        job.invokeOnCompletion { onJobCompletion(job) }
                    }
                }

        if (disposableHandles.isEmpty()) {
            _disposableHandles = null
            close()
        } else {
            _disposableHandles = disposableHandles
            val pending = _pendingJobCount.addAndGet(disposableHandles.size)
            if (pending == 0) close()
        }
    }


    fun onJobCompletion(job: E) {
        try {
            offer(job)
            val pending = _pendingJobCount.decrementAndGet()
            if (pending == 0) {
                _disposableHandles = null
                close()
            }
        } catch (_: Exception) {
            // ignore exception if output channel is closed
        }
    }

    override fun cleanupSendQueueOnCancel() {
        _disposableHandles?.apply {
            _disposableHandles = null
            forEach(DisposableHandle::dispose)
        }
    }
}

@qwwdfsad
Copy link
Collaborator

qwwdfsad commented Jul 8, 2018

Could you please elaborate why does one need this operator?
For what kind of tasks it can be used and why it's impossible to emit item to the channel by the Job itself?

@fvasco
Copy link
Contributor Author

fvasco commented Jul 8, 2018

Good catch @qwwdfsad

why it's impossible to emit item to the channel by the Job itself?

Should I do it?
Clearly it is possible, but suspension points in Jobs will not happen in tail position.

I prefer a different approach for this problem, ie using a regular map/reduce.

suspend fun build(vararg parts: Part): Result =
        parts
                .map { async { buildPart(it) } }
                .joinChannel()
                .map { it.await() }
                .reduce { acc, result -> acc + result }

It is possible to implement this function using something like awaitAll, but I have to await all Deferreds before reduce them.
Finally I am confident that joinChannel is pretty versatile (ie with first or find).

@fvasco
Copy link
Contributor Author

fvasco commented Jul 13, 2018

I document my "use-case needed" as future reference.

I have to download a page from n hosts (often two or three), the page is the same so I can use equally a random one, possible the fastest to retrieve.
The required library returns a result if it success or throws an error on fail, sometimes it hangs (HTTPS issue).

I shifted the timeout problem outside, so I have to choose only the fastest page.

A solution is:

fun main(args: Array<String>) = runBlocking {
    val job = Job(coroutineContext[Job])
    try {
        val context = coroutineContext + job
        val loaders =
                args
                        .mapAsync(context) { fetch(it) }
                        .toMutableList()

        while (loaders.isNotEmpty()) {
            val data: String? = select {
                for (verifier in loaders) {
                    verifier.onJoin {
                        loaders.remove(verifier)
                        if (verifier.isCompletedExceptionally) null
                        else verifier.getCompleted()
                    }
                }
            }

            if (data != null) println("Found $data")
        }

        println("Data not founds")
    } finally {
        job.cancel()
    }
}

fun <T, R> Array<out T>.mapAsync(context: CoroutineContext, transform: suspend CoroutineScope.(T) -> R): List<Deferred<R>> =
        map { async(context) { transform(it) } }

using joinChannel

fun main(args: Array<String>) = runBlocking {
    val job = Job(coroutineContext[Job])
    try {
        val context = coroutineContext + job
        val loaders = args.mapAsync(context) { fetch(it) }

        val data = loaders
                .joinChannel()
                .filterNotNull()
                .firstOrNull()

        if (data != null) println("Found $data")
        else println("Data not founds")
    } finally {
        job.cancel()
    }
}

If you have a better way to solve this problem, please give me a tip.
Thank you.

@elizarov
Copy link
Contributor

How about using select to await for the first loader?

val data = select<Data> {
    loaders.forEach {
        it.onAwait { it }
    }
}

@fvasco
Copy link
Contributor Author

fvasco commented Jul 17, 2018

Hi @elizarov
it is the same of

suspend fun <E : Job> Iterable<E>.joinFirst(): E = select {
    for (job in this@joinFirst) {
        job.onJoin { job }
    }
}

suspend fun <E : Deferred<R>, R> Iterable<E>.awaitFirst(): R = joinFirst().getCompleted()

I rethinked my solution and my first proposal is inefficient for my own use case.
I require something like first { } without considering the send order, but I am looking for the first completed job.

So my proposal is something like a joinChannel, when defferred tasks are reordered using the completition time.

I suspect that this issue is related to #172, but I have to work with already created Deffered.

@elizarov
Copy link
Contributor

@fvasco So, you are basically looking for the following extension:

fun <T> List<T: Job>.joinChannel(): Channel<T> = produce {
    val remaining = toMutableSet()
    while (!remaining.isEmpty()) {
        select {
            for (job in remaining) job.onJoin { 
                send(it)
                remaining.remove(it)
            }
        }
    }
}

This implementation is O(N^2) but am I getting what you want correctly?

@fvasco
Copy link
Contributor Author

fvasco commented Jul 18, 2018

Yes @elizarov, it is correct.

I solved my job in this way, five days ago I wrote a really similar ad hoc implementation, I named remaining list as loaders and I currently use this strategy for only three tasks (and I cancel the pending tasks as soon as possible), so it is ok for me.

Eleven days ago I proposed an untested POC with complexity O(N).

However I consider this issue really linked to #172
To solve that issue we need to parallelize the map/reduce operators, in this issue I proposed mapAsync.
In this issue we need to sort a ReceiveChannel of asynchronous tasks using a select clause, personally I need only onJoin.

My recent question #172 (comment) is about the link between this issues.
We can solve this task writing someting like:

val urls: List<String> = TODO()
val data = urls
        .asReceiveChannel()
        .mapNotNull(parallelism = urls.size, preserveOrder = false) { fetch(it) }
        .first()

@elizarov
Copy link
Contributor

@fvasco Thanks. It is indeed related. I wonder if we should really blow up a list of parameters for all the different operators or, maybe, provide a single extension for that you can write something like:

val urls: List<String> = TODO()
val data = urls
        .asReceiveChannel() // optional? define parallel for lists too? 
        .parallel(urls.size) // order is not preserved by default (and maybe never preserved???) 
        .mapNotNull() { fetch(it) }
        .first()

@SolomonSun2010
Copy link

SolomonSun2010 commented Sep 10, 2018

Good, Dart's Future have the wait any and wait all methods:
https://api.dartlang.org/stable/2.0.0/dart-async/Future-class.html

And Dart's async*, yeild*, await for are far more fancy ergonomics :
https://www.dartlang.org/guides/language/language-tour#asynchrony-support
@elizarov may I suggest support above in Kotlin ?

for instance, in Dart,

  await for (var request in requestServer) {
    handleRequest(request);
  }

maybe equivlent in Kotlin:

  for (var request.await() in requestServer) {
    handleRequest(request)
  }

Or like in Scala:

  for (var request in requestServer) {
    val x = handleRequest(request);
  } yield x

@elizarov
Copy link
Contributor

elizarov commented Sep 10, 2018

@SolomonSun2010 We've designed Kotlin coroutines for more ergonomics. In Kotlin these examples look like this:

for (request in requestServer) {
    handleRequest(request)
}

It is cleaner and with less boiler-plate in Kotlin (no need to explicitly write await in Kotlin if you use suspending functions and channels). As an added bonus, it is not hardcoded into the language (unlike Dart), but is implemented in a library, so we can have even more variety of useful constructs without having to release an update to the language. I welcome you to study coroutines guide to learn about other features we offer.

@SolomonSun2010
Copy link

SolomonSun2010 commented Sep 13, 2018

Most / many people treat Kotlin as the Better Java. I appreciate Kotlin 's ergonomics focus spirit.
Hence, some best pratice utilities in popular 3rd library, such as Guava, Eclipse Collections...,perhaps are absorbed in Kotlin.
https://google.github.io/guava/releases/snapshot-jre/api/docs/
There are allAsList() etc. funny utlities.
Thanks for Kotlin extension, it is more feasible. @elizarov
By the way, R language fans will be family kindly with Distpatcher.Android,Dispatcher.XXX, because they are family with data.frame, component.name... this dot naming style.

@qwwdfsad
Copy link
Collaborator

qwwdfsad commented Nov 6, 2018

Closing as obsolete

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants