Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce single-producer, single-consumer channel abstraction #113

Closed
elizarov opened this issue Sep 8, 2017 · 8 comments
Closed

Introduce single-producer, single-consumer channel abstraction #113

elizarov opened this issue Sep 8, 2017 · 8 comments

Comments

@elizarov
Copy link
Contributor

elizarov commented Sep 8, 2017

We are observing that is many cases coordination between coroutines and between a coroutine and blocking world requires single-producer single-consumer channel that can be implemented much cheaper than multi-producer multi-consumer channels that are currently available in the library. This proposal is to add the corresponding single-producer single-consumer abstractions to the kotlinx.coroutines library together with the efficient implementation classes for them.

This was referenced Sep 8, 2017
@akarnokd
Copy link

I've implemented something like that for my experiments. It's not a fully fledged Channel implementation and I'm not 100% certain about the await/notify logic. I'm interested because the given await/notify pair may form the basis of some RxJava - coroutine interoperation functions.

It appears to be working for me but I only managed to save 2 milliseconds overall in the Scrabble benchmark compared to Channel(1) no matter the capacity.

typealias Cont = Continuation<Unit>

class SpscArrayChannel<T>(private val capacity: Int) : AtomicReferenceArray<T>(capacity) {

    private val empty = AtomicReference<Cont?>()

    private val full = AtomicReference<Cont?>()

    private val producerIndex = AtomicLong()

    private val consumerIndex = AtomicLong()

    suspend fun send(t: T) {
        val producerIndex = this.producerIndex
        val pi = producerIndex.get()
        val cap = capacity;
        val offset = pi.toInt() and (cap - 1)

        val consumerIndex = this.consumerIndex

        while (true) {

            val ci = consumerIndex.get()

            if (ci + cap == pi) {
                suspendCoroutine<Unit> { cont -> await(full, cont) }
            } else {
                lazySet(offset, t)
                producerIndex.set(pi + 1)
                if (pi == consumerIndex.get()) {
                    notify(empty)
                }
                break;
            }
        }
    }

    suspend fun receive() : T {
        val consumerIndex = this.consumerIndex

        val ci = consumerIndex.get()
        val cap = capacity;
        val offset = ci.toInt() and (cap - 1)
        val producerIndex = this.producerIndex

        while (true) {
            val pi = producerIndex.get()

            if (ci == pi) {
                suspendCoroutine<Unit> { cont -> await(empty, cont) }
            } else {
                val v = get(offset)
                lazySet(offset, null)
                consumerIndex.set(ci + 1)
                if (ci + cap == producerIndex.get()) {
                    notify(full)
                }
                return v
            }
        }
    }
}



fun notify(ref: AtomicReference<Cont?>) {
    while (true) {
        val cont = ref.get()
        val next : Cont?
        if (cont != null && cont != TOKEN) {
            if (ref.compareAndSet(cont, null)) {
                cont.resume(Unit)
                break
            }
        } else {
            if (ref.compareAndSet(cont, TOKEN)) {
                break;
            }
        }
    }
}

fun await(ref: AtomicReference<Cont?>, cont: Continuation<Unit>) {
    while (true) {
        val a = ref.get()
        if (a == TOKEN) {
            if (ref.compareAndSet(a, null)) {
                cont.resume(Unit)
                break
            }
        } else {
            if (ref.compareAndSet(a, cont)) {
                break;
            }
        }

    }
}

val TOKEN: Cont = object: Continuation<Unit> {
    override val context: CoroutineContext
        get() = throw UnsupportedOperationException()

    override fun resume(value: Unit) {
        throw UnsupportedOperationException()
    }

    override fun resumeWithException(exception: Throwable) {
        throw UnsupportedOperationException()
    }

}

@elizarov
Copy link
Contributor Author

@akarnokd I have a few theories about potential places where performance could be wasted. MPMC channels are definitely not the only inefficiency that we can fix. I am planning to test those theories. Thanks for reminding of Scrabble benchmark as a playground for that.

@elizarov
Copy link
Contributor Author

Here is the general thinking about this issue as of now. Splitting SPSC channels and MPMC channels on a level of a type system is going to add a lot of complications and conceptual weight to it. So, the plan is to first try to optimize implementation of existing MPMC channel as much as we can.

The idea is that a channel can start in SPSC mode and upgrade itself to a full-blown MPMC channel (with queues and stuff) only upon detection of the actual MP/MC situation. This should give us almost all the peformance benefits on SPSC channels, since they have to contain some "concurrency detection" logic anyway even if only for the reasons of being able to "fail fast".

It is very important to note, that SPSC channels also enable other optimizations downstream (like reusing objects that manage coroutine cancellation). We are also looking at the shape of the emitted bytecode and whether it is preventing HotSpot from doing optimizations or not.

@akarnokd
Copy link

I don't think you'd gain much because the detection of multiple concurrent uses at either end requires a pretty similar atomics you'd use in a regular MP/MC situation.

@elizarov
Copy link
Contributor Author

We'll see how far we can push it. Of course, we'll have to use one volatile-read/CAS pair in upgreadable channel implementation, so it is not going to come for free. However, we always have an option to intoroduce a separate SPSC abstraction without atomics later on if that is found to bring significant performance benefits. We can optimize it specifically for data-processing pipelines and can even designate a separate type for it.

The other option on the table is akin to "loop fusion". We can detect produce { ... }.filter { ... } and other similar pipelines right at the moment of their consturction (on the invocation of filter in this case) and fuse the filtering coroutine into the producer coroutine without any channels in between them whatsoever. I think we can affort to pay the cost of CAS once (during invocation of filter) to configure the pipeline. The befit is that it is going to be very fast at pipeling lots of elements, because of the minimal per-element overhead.

Basically, the ideal performance picture is to have O(1) atomic operations per N elements transferred via pipeline. If we can reach it, then this whole excercise with loop fusion makes sense. However, it is not going to be easy to reach, since, at least, the current support for cancellation is based on atomics and we have to check for cancellation O(N) times when piping N elements.

Again, we have several options here. We'll have to either redesign our approach to cancellation tracking or get rid of cancellation completely in order to make it fast, e.g. we can make fused loops non-cancellable and allow for concellation only at suspending asynchronous operations. The latter is what we actually do now (we only make it cancellable and thus pay the cancellation price at the moments of the actual suspension) but that is causing some debate and discussion (see #111)

@elizarov
Copy link
Contributor Author

I wrote a proof-of-concept for suspension-based reactive, functional, single-producer single-consumer abstraction (not a concurrent channel) and it shows very promising performance on my simple benchmark. See here for details: reactor/reactor-core#979 (comment)

@pull-vert
Copy link

pull-vert commented Feb 10, 2018

Hi, I forked your StreamBenchmarks project and just made a Pull request with a SpScChannel implementation, with 2 new benchmarks to compare the Async (with buffer) performances :

  1. SourceInline with existing Channel (MpMc)
  2. SourceInline with this new SpScChannel

Here are the results with my computer :
testSourceInlineDeep avgt 20 8,003 ± 0,145 ms/op
testSourceInlineThreadBuffer128MpMc avgt 20 463,736 ± 28,238 ms/op
testSourceInlineThreadBuffer128SpSc avgt 20 260,945 ± 10,363 ms/op

There are probably optimisations to make for better results, but this is a start :)

@elizarov
Copy link
Contributor Author

elizarov commented Mar 14, 2018

This is issues is superseded by support for cold stream as described in #254 and #285. The will be SPSC channels used internally, to speed up transfer between different contexts, but no public-facing abstractions for SPSC channels.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants