Skip to content

Commit

Permalink
AMMEND
Browse files Browse the repository at this point in the history
  • Loading branch information
igoriakovlev committed Sep 12, 2023
1 parent 427a1fd commit 473b226
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 261 deletions.
138 changes: 11 additions & 127 deletions kotlinx-coroutines-core/js/src/JSDispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,50 +4,18 @@

package kotlinx.coroutines

import kotlinx.coroutines.internal.*
import org.w3c.dom.*
import kotlin.coroutines.*
import kotlin.js.Promise

private const val MAX_DELAY = Int.MAX_VALUE.toLong()
internal class ScheduledMessageQueue(private val dispatcher: SetTimeoutBasedDispatcher) : MessageQueue() {
val processQueue: dynamic = { process() }

private fun delayToInt(timeMillis: Long): Int =
timeMillis.coerceIn(0, MAX_DELAY).toInt()

internal sealed class SetTimeoutBasedDispatcher: CoroutineDispatcher(), Delay {
inner class ScheduledMessageQueue : MessageQueue() {
internal val processQueue: dynamic = { process() }

override fun schedule() {
scheduleQueueProcessing()
}

override fun reschedule() {
setTimeout(processQueue, 0)
}
}

internal val messageQueue = ScheduledMessageQueue()

abstract fun scheduleQueueProcessing()

override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
parallelism.checkParallelism()
return this
}

override fun dispatch(context: CoroutineContext, block: Runnable) {
messageQueue.enqueue(block)
}

override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
val handle = setTimeout({ block.run() }, delayToInt(timeMillis))
return ClearTimeout(handle)
override fun schedule() {
dispatcher.scheduleQueueProcessing()
}

override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val handle = setTimeout({ with(continuation) { resumeUndispatched(Unit) } }, delayToInt(timeMillis))
continuation.invokeOnCancellation(handler = ClearTimeout(handle).asHandler)
override fun reschedule() {
setTimeout(processQueue, 0)
}
}

Expand All @@ -57,48 +25,7 @@ internal object NodeDispatcher : SetTimeoutBasedDispatcher() {
}
}

internal object SetTimeoutDispatcher : SetTimeoutBasedDispatcher() {
override fun scheduleQueueProcessing() {
setTimeout(messageQueue.processQueue, 0)
}
}

private open class ClearTimeout(protected val handle: Int) : CancelHandler(), DisposableHandle {

override fun dispose() {
clearTimeout(handle)
}

override fun invoke(cause: Throwable?) {
dispose()
}

override fun toString(): String = "ClearTimeout[$handle]"
}

internal class WindowDispatcher(private val window: Window) : CoroutineDispatcher(), Delay {
private val queue = WindowMessageQueue(window)

override fun dispatch(context: CoroutineContext, block: Runnable) = queue.enqueue(block)

override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val handle = window.setTimeout({ with(continuation) { resumeUndispatched(Unit) } }, delayToInt(timeMillis))
continuation.invokeOnCancellation(handler = WindowClearTimeout(handle).asHandler)
}

override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
val handle = window.setTimeout({ block.run() }, delayToInt(timeMillis))
return WindowClearTimeout(handle)
}

private inner class WindowClearTimeout(handle: Int) : ClearTimeout(handle) {
override fun dispose() {
window.clearTimeout(handle)
}
}
}

private class WindowMessageQueue(private val window: Window) : MessageQueue() {
internal class WindowMessageQueue(private val window: Window) : MessageQueue() {
private val messageName = "dispatchCoroutine"

init {
Expand All @@ -119,52 +46,9 @@ private class WindowMessageQueue(private val window: Window) : MessageQueue() {
}
}

/**
* An abstraction over JS scheduling mechanism that leverages micro-batching of dispatched blocks without
* paying the cost of JS callbacks scheduling on every dispatch.
*
* Queue uses two scheduling mechanisms:
* 1) [schedule] is used to schedule the initial processing of the message queue.
* JS engine-specific microtask mechanism is used in order to boost performance on short runs and a dispatch batch
* 2) [reschedule] is used to schedule processing of the queue after yield to the JS event loop.
* JS engine-specific macrotask mechanism is used not to starve animations and non-coroutines macrotasks.
*
* Yet there could be a long tail of "slow" reschedules, but it should be amortized by the queue size.
*/
internal abstract class MessageQueue : MutableList<Runnable> by ArrayDeque() {
val yieldEvery = 16 // yield to JS macrotask event loop after this many processed messages
private var scheduled = false

abstract fun schedule()

abstract fun reschedule()

fun enqueue(element: Runnable) {
add(element)
if (!scheduled) {
scheduled = true
schedule()
}
}

fun process() {
try {
// limit number of processed messages
repeat(yieldEvery) {
val element = removeFirstOrNull() ?: return@process
element.run()
}
} finally {
if (isEmpty()) {
scheduled = false
} else {
reschedule()
}
}
}
}

// We need to reference global setTimeout and clearTimeout so that it works on Node.JS as opposed to
// using them via "window" (which only works in browser)
private external fun setTimeout(handler: dynamic, timeout: Int = definedExternally): Int
private external fun clearTimeout(handle: Int = definedExternally)
internal external fun setTimeout(handler: dynamic, timeout: Int = definedExternally): Int
internal external fun clearTimeout(handle: Int = definedExternally)
internal fun setTimeout(window: WindowOrWorkerGlobalScope, handler: () -> Unit, timeout: Int): Int =
window.setTimeout(handler, timeout)
126 changes: 126 additions & 0 deletions kotlinx-coroutines-core/jsAndWasmShared/src/internal/JSDispatcher.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright 2016-2023 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines

import kotlinx.coroutines.internal.*
import org.w3c.dom.*
import kotlin.coroutines.*

private const val MAX_DELAY = Int.MAX_VALUE.toLong()

private fun delayToInt(timeMillis: Long): Int =
timeMillis.coerceIn(0, MAX_DELAY).toInt()

internal sealed class SetTimeoutBasedDispatcher: CoroutineDispatcher(), Delay {
val messageQueue = ScheduledMessageQueue(this)

abstract fun scheduleQueueProcessing()

override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
parallelism.checkParallelism()
return this
}

override fun dispatch(context: CoroutineContext, block: Runnable) {
messageQueue.enqueue(block)
}

override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
val handle = setTimeout({ block.run() }, delayToInt(timeMillis))
return ClearTimeout(handle)
}

override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val handle = setTimeout({ with(continuation) { resumeUndispatched(Unit) } }, delayToInt(timeMillis))
continuation.invokeOnCancellation(handler = ClearTimeout(handle).asHandler)
}
}

internal class WindowDispatcher(private val window: Window) : CoroutineDispatcher(), Delay {
private val queue = WindowMessageQueue(window)

override fun dispatch(context: CoroutineContext, block: Runnable) = queue.enqueue(block)

override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val handle = setTimeout(window, { with(continuation) { resumeUndispatched(Unit) } }, delayToInt(timeMillis))
continuation.invokeOnCancellation(handler = WindowClearTimeout(handle).asHandler)
}

override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
val handle = setTimeout(window, block::run, delayToInt(timeMillis))
return WindowClearTimeout(handle)
}

private inner class WindowClearTimeout(handle: Int) : ClearTimeout(handle) {
override fun dispose() {
window.clearTimeout(handle)
}
}
}

internal object SetTimeoutDispatcher : SetTimeoutBasedDispatcher() {
override fun scheduleQueueProcessing() {
setTimeout(messageQueue.processQueue, 0)
}
}

private open class ClearTimeout(protected val handle: Int) : CancelHandler(), DisposableHandle {

override fun dispose() {
clearTimeout(handle)
}

override fun invoke(cause: Throwable?) {
dispose()
}

override fun toString(): String = "ClearTimeout[$handle]"
}


/**
* An abstraction over JS scheduling mechanism that leverages micro-batching of dispatched blocks without
* paying the cost of JS callbacks scheduling on every dispatch.
*
* Queue uses two scheduling mechanisms:
* 1) [schedule] is used to schedule the initial processing of the message queue.
* JS engine-specific microtask mechanism is used in order to boost performance on short runs and a dispatch batch
* 2) [reschedule] is used to schedule processing of the queue after yield to the JS event loop.
* JS engine-specific macrotask mechanism is used not to starve animations and non-coroutines macrotasks.
*
* Yet there could be a long tail of "slow" reschedules, but it should be amortized by the queue size.
*/
internal abstract class MessageQueue : MutableList<Runnable> by ArrayDeque() {
val yieldEvery = 16 // yield to JS macrotask event loop after this many processed messages
private var scheduled = false

abstract fun schedule()

abstract fun reschedule()

fun enqueue(element: Runnable) {
add(element)
if (!scheduled) {
scheduled = true
schedule()
}
}

fun process() {
try {
// limit number of processed messages
repeat(yieldEvery) {
val element = removeFirstOrNull() ?: return@process
element.run()
}
} finally {
if (isEmpty()) {
scheduled = false
} else {
reschedule()
}
}
}
}
Loading

0 comments on commit 473b226

Please sign in to comment.