Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support "worker pool" pattern in actor builder and other related operators #172

Open
elizarov opened this issue Nov 29, 2017 · 15 comments
Open

Comments

@elizarov
Copy link
Contributor

elizarov commented Nov 29, 2017

actor builder should natively support "worker pool" pattern via an additional optional parameter parallelism concurrency that defaults to 1, so that to you if you have a list of of some requests, then can be all processed concurrently with a specified limited concurrency with a simple code like this:

val reqs: List<Request> = ...
val workers = actor(concurrency = n) { 
    for (it in channel) processeRequest(it)
}

This particular pattern seems to be quite common, with requests being stored in either a list of requests of receive from some other channel, so the proposal is to add concurrency to map, and cosumeEach, too, to be able to write something like:

incomingRequests.consumeEach(concurrency = n) { processRequest(it) }

UPDATE: We will consistently call it concurrency here. We can have dozens of concurrent coroutines which run on a single CPU core. We will reserve the name parallelism to denote limits on the number of CPU cores that are used.

@elizarov elizarov changed the title Suppor "worker pool" pattern in actor builder and related operators Support "worker pool" pattern in actor builder and other related operators Nov 29, 2017
@SolomonSun2010
Copy link

@enleur
Copy link

enleur commented Mar 7, 2018

Is this too naive implementation of map?

fun <E, R> ReceiveChannel<E>.map(
    context: CoroutineContext = kotlinx.coroutines.experimental.Unconfined,
    parallelism: Int = 1,
    transform: suspend (E) -> R
): ReceiveChannel<R> = produce(context, capacity = parallelism) {
    (0 until parallelism).map {
        launch(context) {
            consumeEach {
                send(transform(it))
            }
        }
    }.forEach { it.join() }
}

@elizarov
Copy link
Contributor Author

elizarov commented Mar 9, 2018

@enleur This is close. However, I'd like to have a slightly more efficient implementation that launches up to n coroutines only as they are needed, so that it starts up efficiently even for very large values of n.

@dobriy-eeh
Copy link

As a proposal for an alternative implementation:

suspend fun <T> forkJoin(
        context: CoroutineContext = DefaultDispatcher,
        start: CoroutineStart = CoroutineStart.DEFAULT,
        outerBlock: (fork: (suspend () -> T) -> Unit) -> Unit
): List<T> {
    val deferreds = ArrayList<Deferred<T>>()
    outerBlock({ deferreds.add(async(context, start) { it() }) })
    return deferreds.map { it.await() }
}

Usage example 1:

val stream = listOf(1, 2, 3).stream()
val results = forkJoin<Int> { fork ->
    stream.forEach { fork { suspendFunc(it) } }
}

Usage example 2:

val results = forkJoin<Int> { fork ->
    for (i in 1..5) {
        if (i % 2 == 0)
            continue

        fork { suspendFunc(i) }
    }
}

The main advantage: this is quite flexible with respect to the outer "looping" code.
You are not limited some strict interface for outgoing data: for example stream only or channel only.
You can use any language features to organize fork loop: for, if, streams and so on.

Also you are not limited exactly one 'request' parameter for processing function, you may use function with any number of parameters.

@fvasco
Copy link
Contributor

fvasco commented Jul 18, 2018

Does concurrent map preserve the order?

Should we introduce a optional parameter preserveOrder : Boolean = true for some operators? (ie map, filter, ...)

@elizarov
Copy link
Contributor Author

Sometimes you need an order preserved, sometimes you do not. I wonder what should be the default and whether it should be controlled by a boolean of there should be separate operators.

@elizarov
Copy link
Contributor Author

elizarov commented Jul 20, 2018

Note, that an alternative design approach to solve the use-case of parallel processing is to introduce a dedicated parallel (?) combinator, so that channel.parallel().map { transform(it) } would perform transform in parallel for all incoming elements without preserving the order.

@fvasco
Copy link
Contributor

fvasco commented Jul 20, 2018

I am considering the follow signature, this encapsulates the parallel blocks and allows to reuse all current operators.

suspend fun <E, R> ReceiveChannel<E>.parallel(
        parallelism: Int,
        block: suspend ProducerScope<R>.(ReceiveChannel<E>) -> Unit
): ReceiveChannel<R>

or

suspend fun <E, R> ReceiveChannel<E>.parallel(
        parallelism: Int,
        block: suspend ReceiveChannel<E>.() -> ReceiveChannel<R>
): ReceiveChannel<R>

@fvasco
Copy link
Contributor

fvasco commented Jul 21, 2018

I take some time to expose my previous message.

The idea behind is to use a regular fork/join strategy, fork and join using Channels is pretty easy, so it is possible use paralel pipelines to process items.

Multiple coroutines receive items from a single source ReceiveChannel and send results to the output channel.

suspend fun <E, R> ReceiveChannel<E>.pipelines(
        parallelism: Int,
        block: suspend ReceiveChannel<E>.() -> ReceiveChannel<R>
): ReceiveChannel<R>


val ids: ReceiveChannel<Int> = loadIds()
val largeItem = ids
        .pipelines(5) {
            map { loadItem(it) }
                    .filter { it.active }
        }
        .maxBy { it.size }
}

Unfortunately using this syntax is difficult consume data in parallel, ie consumeEach.

So an alternative syntax can be:

suspend fun <E, R> ReceiveChannel<E>.fork(
        parallelism: Int,
        block: suspend (ReceiveChannel<E>) -> R
): List<R>


val largeItem = ids
        .fork(5) {
            it.map { loadItem(it) }
                    .filter { it.active }
                    .maxBy { it.size }
        }
        .filterNotNull()
        .maxBy { it.size }

Obviously consuming items in the fork function produces a List<Unit> and does not requires the join phase.

I suspect that both operators are useful.

@gildor
Copy link
Contributor

gildor commented Oct 23, 2018

I want to bump this issue.
This pattern is so often, I see questions about implementation at least each week on Kotlin Slack #coroutines channel also all fast ad-hoc implementations often have problems (a similar problem we had before awaitAll extensions, when simple extension functions just use map { it.await() } which leak coroutines in case of error)

@rnett
Copy link

rnett commented Jan 27, 2019

A potential implementation of consumeEach with the spin up:

suspend inline fun <E> ReceiveChannel<E>.consumeEach(
    maxConcurrency: Int,
    initialConcurrency: Int = 10,
    coroutineContext: CoroutineContext = EmptyCoroutineContext,
    crossinline action: suspend (E) -> Unit
) =
    withContext(coroutineContext) {

        if (maxConcurrency <= 0)
            if (initialConcurrency > maxConcurrency)
                throw IllegalArgumentException("initialConcurrency must be less than or equal to maxConcurrency")
            else if (initialConcurrency < 0)
                throw IllegalArgumentException("Can not have a negative initialConcurrency")


        val busy = AtomicInteger(0)

        val workers = MutableList(min(maxConcurrency, initialConcurrency)) {
            launch {
                while (isActive && !(isClosedForReceive && isEmpty)) {
                    busy.incrementAndGet()
                    action(this@consumeEach.receive())
                    busy.decrementAndGet()
                }
            }
        }

        if (maxConcurrency > initialConcurrency || maxConcurrency <= 0) {
            while (isActive && !(isClosedForReceive && isEmpty) && (workers.size < maxConcurrency || maxConcurrency <= 0)) {
                if (busy.get() == workers.size) {
                    val received = receive()

                    workers += launch {
                        busy.incrementAndGet()
                        action(received)
                        busy.decrementAndGet()

                        while (isActive && !(isClosedForReceive && isEmpty)) {
                            busy.incrementAndGet()
                            action(this@consumeEach.receive())
                            busy.decrementAndGet()
                        }
                    }
                }
                delay(10)
            }
        }
        
        workers.joinAll()
    }

I really dislike that while loop to check sizes. It may be possible to do some kind of fake-observable on busy and only launch a watcher coroutine when it hits max (and cancel it when it drops down).

Either way, it shouldn't be to terrible as it quits once the spin up is done, and will often be waiting on receive().

I'm also not sure if the joinAll() at the end is necessary, as afaik the couroutineScope should do any clean up, but I'm not sure enough to leave it off.

@rnett
Copy link

rnett commented Jan 27, 2019

This pattern is common enough even outside of actors (e.g. make a lot of web requests, but only have 10 going at a time) that it seems like it might be worth having a separate api for launching n amount of coroutines, and use that here, rather than vise versa. At the very least there should be something similar for produce.

Something like:

coroutineScope{
    limitedConcurrency(concurrency = 10){
        (1..100).forEach{
            launch{ doThing() }
        }
    }
}

Only 10 doThings would be executing at any given time.

Where any launches would be redirected to either a worker thread, forced to be lazy and started once there is room, or just have the block held until there is room, then launched.

@ultra-taco
Copy link

ultra-taco commented Feb 16, 2019

I agree, coming from RxJava I really wish there was something like flatMap() with maxConcurrency without requiring channels

@LDVSOFT
Copy link

LDVSOFT commented Jul 29, 2023

It's been quite long without updates, is it somewhere on the roadmap?

@singhmanu
Copy link

singhmanu commented Sep 7, 2024

How does this work for handling errors that occur? If you have something one of the transformations fail does there need to be a means of stopping the other transformations and if so how? Or maybe this is not matter since that should be a part of the transformation block

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

10 participants