Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide an API to race coroutines ("losers" are cancelled) #2867

Open
LouisCAD opened this issue Aug 10, 2021 · 15 comments
Open

Provide an API to race coroutines ("losers" are cancelled) #2867

LouisCAD opened this issue Aug 10, 2021 · 15 comments

Comments

@LouisCAD
Copy link
Contributor

Hello,

I keep seeing this suspend functions racing use case being asked on Kotlin's Slack, and I personally need it quite often as it suits many needs in "coroutines-heavy" code.

I've been using my own solution for a long time now, which I also published as a library in Splitties for that (available on MavenCentral), and I didn't find any better API than raceOf({…}, { … }, …) and race { launchRacer { … } }.

Here is an example snippet of raceOf in action taken straight from the README of Splitties Coroutines:

suspend fun testCoroutinesRacing() {
    val result = raceOf({
        delay(3)
        "slow"
    }, {
        delay(0)
        "fast"
    })
    assertEquals(expected = "fast", actual = result)
}

and here's a permalink to the code of these 2 APIs.

Can this API or a better alternative be considered for addition inside kotlinx.coroutines?

@qwwdfsad
Copy link
Member

Hi,

Could you please show few examples of raceOf from a real-world project? I remember discussing at some point, but AFAIR we haven't found any compelling use-cases for that

@LouisCAD
Copy link
Contributor Author

In the main project on my day job, I have close to 100 usages of that function FYI.

I'm going to bring a curated summary of the most interesting use cases a bit later.

@fvasco
Copy link
Contributor

fvasco commented Aug 13, 2021

A race similar API has already been discussed in #424

@qwwdfsad are you looking for this? #424 (comment)

@LouisCAD
Copy link
Contributor Author

LouisCAD commented Sep 2, 2021

First simple and compelling use case is for timeouts:
With raceOf, you can replace withTimeoutOrNull without introducing ambiguity as to why the result of the enclosing expression is null, and you can replace withTimeout without having to surround with try/catch block:

suspend fun whatever(
    timeout: Duration = Duration.seconds(6)
): SomeResult = raceOf({
    SomeResult.Yay(value = doStuffAndGetSomethingQuiteQuickly())
}, {
    delay(timeout)
    SomeResult.TimedOut(after = timeout)
})

Then it works well to surface cancellability in a way that keeps everything sequential:

suspend fun tryDoingStuff(awaitCancelRequest: suspend () -> Unit) = raceOf({
    doStuff() // When complete, awaitCancelRequest() will be cancelled.
}, {
    awaitCancelRequest() // doStuff() would be cancelled automatically.
})

It also works well when you have multiple ways to provide some input data:

suspend fun getToken(ui: RequiresLoginUi) = raceOf({
    getTokenFromStorage()?.takeUnless { it.isStale() } ?: awaitCancellation()
}, login@{
    repeatWhileActive {
        val credentials = raceOf({
            getSavedCredentialsFromAutoFill()
        }, {
            ui.awaitCredentialsSubmitting()
        })
        when (val loginResult = attemptLogin(credentials)) {
            is Attempt.Success -> return@login loginResult.token
            is Attempt.Failure -> ui.awaitLoginFailureAcknowledgement(loginResult)
        }
    }
})

suspend inline fun repeatWhileActive(block: () -> Unit): Nothing {
    while (true) {
        coroutineContext.ensureActive()
        block()
    }
}

It works well when you want to enforce a certain UX process where there is ambiguity, regardless of how the UI is implemented:

suspend awaitTrigger() = raceOf({
    awaitAutomaticTrigger()
}, {
    awaitManualTrigger()
})
suspend awaitDoorLockRequest(
    voiceTrigger: VoiceTrigger,
    awaitLockButtonClick: suspend () -> Unit
) = raceOf({
    voiceTrigger.awaitHotPhrase()
}, {
    awaitLockButtonClick()
})
suspend fun runSomeOnboardingStep(ui: SomeUi) = raceOf({
    ui.awaitExistingUserConfirmation() 
}, {
    ui.awaitRequestToWatchQuickIntro()
    raceOf({
        ui.showQuickIntroVideoUntilComplete()
    }, {
        ui.awaitSkipRequest()
    })
})

@dkhalanskyjb
Copy link
Collaborator

The following are just some thoughts on the matter.

In general, this can be thought of as non-atomic selects. In fact, I see that this library implements this via selects in a non-atomic manner: https://github.com/LouisCAD/Splitties/blob/7539eaa9fa59fa92720c7ba5abd559ae4fc9172e/modules/coroutines/src/commonMain/kotlin/splitties/coroutines/Racing.kt

So, select vs raceOf (or nonAtomicSelect):

  • select is atomic, so, by design, if one branch finishes successfully, the others shouldn't have an effect. With nonAtomicSelect, one has to consider the implications of several operations successfully finishing.
  • nonAtomicSelect works for everything that's cancellable. Specific operations don't require any special treatment.
  • As a consequence, nonAtomicSelect is somewhat easier to use: just run the operations and don't guess what onZZZ to use.
  • select is somewhat prettier, with its builder syntax.
  • nonAtomicSelect doesn't separate the async operation and the post-processing of the result. This may simplify using it, but at the cost of the concurrent operations working for longer than they should if the post-processing is long.

@LouisCAD
Copy link
Contributor Author

The post-processing is only cancelling "losers" here, unless we want to consider an API that allows a given set of winners, which would be interesting, albeit beyond the scope of this specific issue that already has clear use cases IMHO.

Or would the risk you're mentioning be able to cause cross-cancellation, where no one wins because there's an ex-aequo? I think such a behavior would not work well for application use cases, best to pick first, or be random for this rare case.

@dkhalanskyjb
Copy link
Collaborator

By post-processing I mean the part after the asynchronous part of the operation is complete. In select, it's the things that are in the code blocks:

select<Int> {
  channel1.onReceive { v ->
    Thread.sleep(1000) // post-processing
    v
  }
  channel2.onReceive { v ->
    Thread.sleep(1000) // post-processing
    v
  }
}

In select, after the asynchronous operation is itself completed, all the other operations stop. In your implementation, the various branches only cancel the other operations when the value is ready to be returned.

A middle ground between raceOf and select is something like what Rust does: https://docs.rs/futures/0.3.4/futures/macro.select.html It has an interface similar to our select, but it's implemented similarly to raceOf.

@elizarov
Copy link
Contributor

Note, that you can already use select to race coroutines, but it takes a bit of additional boilerplate:

suspend fun main() {
    val result = coroutineScope {
        select<String> {
            async {
                delay(300)
                println("slow")
                "Slow Result"
            }.onAwait { it }
            async {
                delay(200)
                println("fast")
                "Fast Result"
            }.onAwait { it }
        }.also { coroutineContext.cancelChildren() }
    }
    println("Result=$result")
}

I'd suggest starting directly from here and splitting it into two features.

The first one is the combination of coroutineScope { select { ... }.also { coroutineContext.cancelChildren() } } which introduces a scope that extends SelectBuilder with an additional DSL (see below). Let's call it __selectAndCancelRest__ for now.

The other is a combination of async { ... }.onAwait { it }. Let's call it __async__ for now. With those two pieces, you can now write:

suspend fun main() {
    val result = __selectAndCancelRest__<String> {
        __async__ {
            delay(300)
            println("slow")
            "Slow Result"
        }
        __async__ {
            delay(200)
            println("fast")
            "Fast Result"
         }
    }
    println("Result=$result")
}

This construct is more composable than a plain raceOf, as you can also do other onXxx clauses inside and, for example, easily throw receiving from a channel into the mix that would win and cancel all the ongoing __async__ operations.

P.S. Naming needs a careful design here.

@fvasco
Copy link
Contributor

fvasco commented Dec 21, 2021

This looks like #1065

@elizarov
Copy link
Contributor

elizarov commented Dec 21, 2021

It does not have to involve #1065 at all. In order for the race DSL to be clear, it is critical not to expose the internal CoroutineScope (that gets its children cancelled after select) to the outside public. It can be very error-prone if you can accidentally launch inside this scope, and have your freshly launched coroutine cancelled on the scope exit, violating your usual expectation on how children work (e.g. channel.onReceive { it -> launch { doSomething(it) } } suddenly complies, but does not work).

It is quite important here to clearly name all __xxx__ pieces in this DSL to carry this "implicit cancellation" meaning and avoid misunderstanding.

@fvasco
Copy link
Contributor

fvasco commented Dec 22, 2021

@elizarov I think this use case is one more specific than #1065, a recurring problem like #424.
This issue proposes a DSL to fix this specific use case, but it doesn't resolve similar use cases (this allows leak in channel's receive).

Using the #1065 idea, it is possible to implement your idea:

val result = coroutineScope {
  select<String> {
    async (onScopeTermination = ScopeTermination.CANCEL) {
      delay(300)
      println("slow")
      "Slow Result"
    }.onAwait { it }
    async (onScopeTermination = ScopeTermination.CANCEL) {
      delay(200)
      println("fast")
      "Fast Result"
    }.onAwait { it }
  }
}
println("Result=$result")

I understand that this version is more verbose because the syntax is not specific to this issue.

@LouisCAD
Copy link
Contributor Author

The wrapping coroutineScope looks like boilerplate and unwanted indent in this snippet posted just above. Is it required?

@fvasco
Copy link
Contributor

fvasco commented Dec 22, 2021

I reused the #1065 idea, similar to the elizarov's first example, so an extra coroutineScope is required.

I am not convinced that the __selectAndCancelRest__ is a good idea, I prefer an API similar to onTimeout (this fires or has been cancelled), something like:

suspend fun main() {
    val result = select<String> {
        __onAwait__ {
            delay(300)
            println("slow")
            "Slow Result"
        }
        __onAwait__ {
            delay(200)
            println("fast")
            "Fast Result"
         }
    }
    println("Result=$result")
}

@lowasser
Copy link
Contributor

lowasser commented Mar 7, 2023

Usually, I find the most convenient idiom for this

suspend fun <R> race(vararg races: suspend () -> R): R {
  return channelFlow {
    for (race in races) {
      launch { send(race()) }
     }
   }.first()
 }

...instead of bothering with any select shenanigans at all. This doesn't need any extra work to handle cancellation.

@LouisCAD
Copy link
Contributor Author

LouisCAD commented Mar 7, 2023

I'm wondering whether that interesting solution leveraging callbackFlow is more or less efficient than the solution using select. Maybe @elizarov knows?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants