Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Need way to use parallel decomposition of list without saturating dispatcher queue #1022

Open
MartinDevi opened this issue Mar 1, 2019 · 6 comments

Comments

@MartinDevi
Copy link

I think the coroutine library is missing a simple solution to use parallel decomposition on the contents of a large list without saturating the dispatcher queue, so that other coroutine work won't be suspended until the entire list has been processed.

I present the issue here and some solutions that I considered. If you know better solutions, please point them out.

It seems to me like a common enough scenario and a serious enough issue to be considered in the standard library.

Issue

I have a large list of items, and I need to perform CPU-intensive work on each of them (in case you're curious, deciphering) to obtain an output. I'd like to allow this work to be done in parallel on each item. My first instinct was to simply use async to start all the processes, then await that they all finish.

suspend fun <T, R> Iterable<T>.mapAsync(
    transform: suspend (T) -> R
): List<R> = coroutineScope {
    map { async { transform(it) } }.awaitAll()
}

This works fine in a very simple scenario. However, I then wanted to display the progress, with the number of items that have been successfully processed. I introduced an actor to receive a signal whenever an item has been processed and count the progress, then forward progress updates to another channel.

suspend fun <T, R> Iterable<T>.mapAsync(
    progressChannel: SendChannel<Int>,
    transform: suspend (T) -> R
): List<R> = coroutineScope {
    val progressActor = actor<Unit> {
        var index = 0
        for (unit in channel) {
            progressChannel.send(index++)
        }
    }
    val deferredList = map {
        async {
            val result = transform(it)
            progressActor.send(Unit)
            result
        }
    }
    val result = deferredList.awaitAll()
    progressActor.close()
    result
}

However, this implementation doesn't work. The progress updates are only received after all the items have been processed. To reproduce, I set up the following unit test.

@Test
fun testMapAsync() = runBlocking<Unit> {
    withContext(newFixedThreadPoolContext(4, "")) {
        val progressActor = actor<Int> {
            for (progress in this) {
                println("Progress $progress")
            }
        }

        (1..100).mapAsync(progressActor) {
            Thread.sleep(200)
            println("Processed item $it")
        }

        progressActor.close()
    }
}

Rather than printing the progress progressively, shortly after each item is processed, this test prints all the statements indicating that items are processed, then prints all the statements that progress updates have been received.

From what I understand, this is due to the fact that the dispatcher is "fair". Since all the worker coroutines are enqueued immediately, they will be executed before any other coroutines, including the one which updates the progress.

Existing Possible Solutions and Drawbacks

Dedicated Dispatcher

This method doesn't require sample code, it simply relies on having a dedicated dispatcher for the worker coroutines. The drawbacks are obvious. It's also possible to invert the solution, by having a dedicated dispatcher for the progress actor, but this doesn't fully solve the issue either.

Fan-Out

It's possible to use the "Fan-out" mechanism described in the coroutine guide, to manually parallelize the processing by pulling from a channel.

const val PARALLELISM = 4

suspend fun <T, R> Iterable<T>.mapAsync(
    progressChannel: SendChannel<Int>,
    transform: suspend (T) -> R
): List<R> = coroutineScope {
    val result = mutableListOf<R>()
    val progressActor = actor<R> {
        var index = 0
        for (item in channel) {
            result += item
            progressChannel.send(index++)
        }
    }
    val source = produce {
        forEach { send(it) }
        close()
    }
    List(PARALLELISM) {
        launch {
            for (item in source) {
                progressActor.send(transform(item))
            }
        }
    }.joinAll()
    progressActor.close()
    result
}

When doing this, the initial order of the elements isn't maintained, and the parallelism has to be explicitly provided rather than relying on the default dispatcher to optimize for the number of cores (which could also be done manually, but seems like repetition).

Producer Channel

Rather than using map on a list to call async, use produce to create a channel. This artificially introduces suspension points so that the worker coroutines aren't immediately added to the executor queue, but instead are added regularly when the queue finishes some work. In other words, since adding workers is itself done in a coroutine, it will only be done at the rate at which the coroutine dispatcher becomes available.

suspend fun <T, R> Iterable<T>.mapAsync(
    progressChannel: SendChannel<Int>,
    transform: suspend (T) -> R
): List<R> = coroutineScope {
    val progressActor = actor<Unit> {
        var index = 0
        for (unit in this) {
            progressChannel.send(index++)
        }
    }
    val deferredChannel = produce<Deferred<R>> {
        forEach {
            send(async {
                val result = transform(it)
                progressActor.send(Unit)
                result
            })
        }
    }
    val result = deferredChannel.toList().awaitAll()
    progressActor.close()
    result
}

This seems like a hack, and I'm unsure of how it behaves when the worker coroutine has multiple suspension points, causing the dispatcher queue to "roll" more frequently.

@elizarov
Copy link
Contributor

elizarov commented Mar 1, 2019

The plan is to eventually add a ready-to-use primitive for that purpose and that is tracked under issue #172

@uberto
Copy link

uberto commented Mar 1, 2019 via email

@fvasco
Copy link
Contributor

fvasco commented Mar 2, 2019

In my opinion the issue in not really addressed and new operators, as proposed, do not solve it.

@MartinDevi solutions works well for this particular use case, but launch multiple mapAsync concurrently can raise the same issue, probably similar for later proposals.

The coroutine scheduler is not fair (a multi thread executor cannot be truly fair), in my opinion the point is that the scheduler are not preemptive, so some large batches can occupy it for an undefined time.

Using only coroutines I consider a solution for this issue a dedicated dispatcher task, bounded like the Default dispatcher but using different set of resource. This should fix this and similar issues.
Probably this idea can be enhanced, a similar solution is to use for an 8 CPU computer, 7 + 1 threads for Default dispatcher and the same 7 + another 1 threads for the background dispatcher.

Maybe I really missed something, but I am really curios to know what, thank you.

@MartinDevi
Copy link
Author

Thanks for all your input.

IMO there's two main points to take from this issue:

  1. The standard library should contain a dedicated function on iterables (or maybe just collections, because some solutions are easier to implement if the size is known in advance) to handle and optimize the map { async { /* ... * } }.awaitAll() pattern, because implementing this pattern correctly is non-trivial.
  2. This standard function should implement a mechanism to "pull" items from the list progressively, rather than immediately queuing coroutines for all of them, so that the dispatcher isn't immediately saturated.

With that regard, while I agree that issue #172 would help to provide a solution, I think we need a dedicated function for this use-case. Ideally, I think it shouldn't require explicitly specifying the concurrency. I think standard solutions which involve adding dedicated dispatchers for each type of task would ultimately lead to more confusion, and like @fvasco mentioned they won't scale well.

@fvasco
Copy link
Contributor

fvasco commented Mar 4, 2019

Regarding the suggested point 2, this mechanism should be global, not local.

In your first example, on 4 CPU machine setting PARALLELISM=4 works well to decipher one file and update UI.
In the future you can define a method to decipher all files in a folder using mapAsync, later you define another method to decipher all folders in a disk.
This last method launches 4*4*4 parallel tasks, so the supposed parallelism is not honored.

@uberto
Copy link

uberto commented Mar 4, 2019 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants