Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Customizable coroutine behaviour on scope termination #1065

Open
fvasco opened this issue Apr 2, 2019 · 15 comments
Open

Customizable coroutine behaviour on scope termination #1065

fvasco opened this issue Apr 2, 2019 · 15 comments

Comments

@fvasco
Copy link
Contributor

fvasco commented Apr 2, 2019

Currently this library allows to define how a coroutine have to start (using start parameter) and its dependency fail behavior (using jobs).
According to structured concurrency it assumes that all sub-coroutines have to join on scope termination.

In my experience this decision does not fit all use case, for some async or produce it is preferable to cancel a coroutine instead of join.
So I wish to consider a parameter to customize the coroutine's behavior on scope termination.

This proposal does not conflict with structured concurrency.

In the example code

    val secret: String? = null
    val userDeferred = async(start = CoroutineStart.LAZY) { loadUser() }

    if (secret != null) {
        require(secret == userDeferred.await().secret)
    }

the strusctured concurrency impose the implicit join of all children.

    val secret: String? = null
    val userDeferred = async(start = CoroutineStart.LAZY) { loadUser() }
    try {
        if (secret != null) {
            require(secret == userDeferred.await().secret)
        }
    } finally {
        userDeferred.join()
    }

So the try-finally is implicit, but unfortunately the first example hangs indefinitely, moreover we MUST write the code

    val secret: String? = null
    val userDeferred = async(start = CoroutineStart.LAZY) { loadUser() }
    try {
        if (secret != null) {
            require(secret == userDeferred.await().secret)
        }
    } finally {
        userDeferred.cancel()
    }

Structured concurrency should avoid to handle join manually.

Proposal

My proposal is to consider a parameter to customize the behavior on scope termination, ie: default, join, cancel.

Ie:

    val secret: String? = null
    val userDeferred = async(start = CoroutineStart.LAZY, onScopeTermination = ScopeTermination.CANCEL) { loadUser() }
    try {
        if (secret != null) {
            require(secret == userDeferred.await().secret)
        }
    } finally {
        userDeferred.join()
    }

Alternatives

This issue was already solved for ReceiveChannel using the consume function, we can copy the same block for Deferred.

inline fun <E, R> Deferred<E>.consume(block: (Deferred<E>) -> R): R {
    var cause: Throwable? = null
    try {
        return block(this)
    } catch (e: Throwable) {
        cause = e
        throw e
    } finally {
        @Suppress("DEPRECATION")
        cancel(cause)
    }
}

So the first example becomes:

    val secret: String? = null
    async(start = CoroutineStart.LAZY) { loadUser() }.consume { userDeferred ->
        if (secret != null) {
            require(secret == userDeferred.await().secret)
        }
        userDeferred.cancel()
    }

We have to introduce a nested block, similar to use function that mimic the try-with-resource block.

Future improvements:

Introducing a new termination behavior allow us to define a similar API to avoid the nested block

fun CoroutineScope.cancelOnScopeTermination(job: Job) {
    launch(start = CoroutineStart.LAZY, onScopeTermination = ScopeTermination.JOIN) {
        job.cancel()
    }
}
    val secret: String? = null
    val userDeferred = async(start = CoroutineStart.LAZY) { loadUser() }
    cancelOnScopeTermination(userDeferred)

    if (secret != null) {
        require(secret == userDeferred.await().secret)
    }

This requires an extra allocation and miss of exception handling, but I consider scope finalizers a future enhancement.

What do you think about?

@fvasco
Copy link
Contributor Author

fvasco commented Apr 3, 2019

Analogously with structured programming, this library deals with common common pattern.

Function invocation become async/await
Object message passing is pending #87
delegation in unplanned KT-20414
when become a select
for loop is pending #328
collection operators is pending #1022

try-catch-finally looks pretty uncovered to me.
The suspending version do not work like the structured one.

import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun main() {
    println("Start")
    runBlocking {
        try {
            coroutineScope {
                launch { TODO() }
            }
        } catch (e: Exception) {
            e.printStackTrace()
        } finally {
            launch { println("Done!") }
        }
    }
    println("End")
}

@elizarov
Copy link
Contributor

Thanks for this write-up. This is an interesting use-case explaining the problem of scope termination for lazy coroutines.

I don't like the solution with onScopeTermination parameter because it does not ready scale do different coroutine builder that might potentially need the same handling cancelOnScopeTermination solution looks more promising, but it suffers from the non-atomicity, e.g. if something goes wrong between async and onScopeTermination it still hangs.

I actually think that we can simply adjust existing behavior of lazy coroutines on scope termination, that is make cancellation of non-started lazy coroutines on scope termination a default behavior and do not provide any other configuration. You've showed a good use-case for a combination of CoroutineStart.LAZY and ScopeTermination.CANCEL but

  • Are there any use-cases for CoroutineStart.DEFAULT and ScopeTermination.CANCEL. Do we need to support it?
  • Are there any use-cases for CoroutineStart.LAZY and some other approach to non-started coroutines, but cancelling them?

@elizarov elizarov changed the title Customizable coroutine behavior on scope termination. Customizable coroutine behavior on scope termination Apr 10, 2019
@fvasco
Copy link
Contributor Author

fvasco commented Apr 10, 2019

@elizarov

Are there any use-cases for CoroutineStart.DEFAULT and ScopeTermination.CANCEL. Do we need to support it?

The following code is production code, we have to get a value from a Channel or request a fresh value.

val value = channel.poll() ?: run {
    val requestDeferred = async { requestValue() }

    try {
        select<Int> {
            channel.onReceive { it }
            requestDeferred.onAwait { it }
        }
    } finally {
        requestDeferred.cancel()
    }
}

However paying some extra allocations it is possible to rewrite it using lazy async.

@elizarov
Copy link
Contributor

making it async(start = LAZY) does not do any extra allocations.

@elizarov
Copy link
Contributor

However, I see the problem here. You want to cancel your async on scope exit even if it was already started. This is indeed a use-case for cancellation on scope exit that is orthogonal to whether it is lazy or not, already started or not.

@fvasco fvasco changed the title Customizable coroutine behavior on scope termination Customizable coroutine behaviour on scope termination Sep 17, 2019
@fvasco fvasco mentioned this issue Apr 5, 2021
@robbiebowman
Copy link

Is there any plan to change the scope termination behaviour for lazy coroutines? It seems counter intuitive that I would have to cancel an async I never started.

@jameslamine
Copy link

Are there any plans to implement this? I like this proposal. Right now I'm doing:

supervisorScope {
   try {
       val data1 =  this.async(start = CoroutineStart.LAZY) { getData1() }
       val data2 = this.async(start = CoroutineStart.LAZY) { getData2() }
   
       maybeUseData(data1, data2)
   } finally {
      this.coroutineContext.cancelChildren()
   }
}

But it would be nice if I can just do:

supervisorScope {
  val data1 =  this.async(start = CoroutineStart.LAZY, onScopeTermination = ScopeTermination.CANCEL) { getData1() }
  val data2 = this.async(start = CoroutineStart.LAZY, onScopeTermination = ScopeTermination.CANCEL) { getData2() }
   
  maybeUseData(data1, data2)
}

@matejdro
Copy link

matejdro commented Jul 24, 2023

Another use case here is to collect first emitted value between multiple flows. As far as I know, this can only be done using select, but to use flows in select, you have to convert them to Channel first using produceIn:

suspend fun getFirstEmittedFromMultipleFlows() = coroutineScope {
    val channelA = flowA.produceIn(this)
    val channelB = flowB.produceIn(this)
    val channelC = flowC.produceIn(this)

    select {
        channelA.onReceive {
            ...
        }
        channelB.onReceive {
            ...
        }
        channelC.onReceive {
            ...
        }
    }
}

By default, above code will hang and wait until all three flows complete on their own. If I want it to return immediately after first channel receives, I have to manually wrap the code in try finally and call coroutineContext.cancelChildren(), which is pretty verbose. It would be nice to tell coroutineScope to just automatically cancel all children after block completes.

@matejdro
Copy link

matejdro commented Jul 24, 2023

Maybe solution is to provide some way to create a "daemon" coroutines that would not block the scope, but would still get cancelled by it? Something similar to the backgroundScope in runTest.

@qwwdfsad
Copy link
Member

Daemon coroutines is an option we are currently exploring indeed

@e5l
Copy link
Member

e5l commented Oct 2, 2023

We have same case: launching a service while the scope is active. For instance:

@Test
fun testWithService() = runBlocking {
    launch {
        startService()
    }

    // ... do stuff
}

will always freeze the test. So we have to boilerplate it into GlobalScope.launch + invokeOnCompletion which works differently and looks ugly

@LouisCAD
Copy link
Contributor

LouisCAD commented Oct 2, 2023

@e5l with a raceOf({ … }, { … }), you could get the behavior you want, without breaking structured concurrency. Its code is in my Splitties coroutines library as there's no such primitive in kotlinx.coroutines yet.

It'd look like this:

raceOf({
    startService(); awaitCancellation()
}, {
    theCode()
})

Or you can do this:

val serviceJob = launch { startService() }
try {
    theCode()
} finally {
    serviceJob.cancel()
}

Both approaches are better than using a different scope like GlobalScope.

@dkhalanskyjb
Copy link
Collaborator

@e5l
Copy link
Member

e5l commented Oct 5, 2023

@dkhalanskyjb, we can, but it's not only for tests. We have the code in other cases, like for the reading coroutine in a socket.

@floatdrop
Copy link

floatdrop commented Jan 8, 2024

Daemon coroutines is an option we are currently exploring indeed

@qwwdfsad are daemon coroutines bound to specific CoroutineScope or how they would differ from GlobalScope.launch?

We are stumbled on this issue while trying to implement batching suspend DataLoader, which must get all suspend load calls in scope and terminate dispatcher, when scope is done.

One way to do this is to launch dispatch coroutine in GlobalScope and kill it in invokeOnCompletion handler of coroutineContext. But it feels like a hack.

/**
 * Asynchronous interface for wrapping [DataLoader].
 */
interface SuspendDataLoader<K, V> {
    suspend fun loadMany(keys: List<K>): List<V>
}

/**
 * Creates [SuspendDataLoader], that batches load requests to DataLoader and emits dispatch with debounce.
 */
@OptIn(FlowPreview::class, DelicateCoroutinesApi::class)
fun <K, V> CoroutineScope.launchDataLoader(
    timeout: Duration = 1.milliseconds,
    dataLoaderOptions: DataLoaderOptions = DataLoaderOptions(),
    batchLoader: suspend (List<K>) -> List<V>,
): SuspendDataLoader<K, V> {
    val dispatchRequests = Channel<Unit>(Channel.UNLIMITED)

    val dataLoader = DataLoader({ keys -> future { batchLoader(keys) } }, dataLoaderOptions)

    // Launching batching coroutine in background, so current CoroutineScope will not block and wait for it
    val daemonJob = GlobalScope.launch {
        dispatchRequests.consumeAsFlow()
            .debounce(timeout)
            .collect {
                dataLoader.dispatch()
            }
    }

    // When current CoroutineScope is done - cancel daemonJob to avoid coroutines leak
    coroutineContext.job.invokeOnCompletion {
        daemonJob.cancel()
    }

    return object : SuspendDataLoader<K, V> {
        override suspend fun loadMany(keys: List<K>): List<V> {
            val values = dataLoader.loadMany(keys)
            dispatchRequests.send(Unit)
            return values.await()
        }
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

10 participants