diff --git a/kotlinx-coroutines-core/common/src/EventLoop.common.kt b/kotlinx-coroutines-core/common/src/EventLoop.common.kt index 6bfcbf0cbf..84291a1b69 100644 --- a/kotlinx-coroutines-core/common/src/EventLoop.common.kt +++ b/kotlinx-coroutines-core/common/src/EventLoop.common.kt @@ -256,21 +256,7 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay { // unconfined events take priority if (processUnconfinedEvent()) return 0 // queue all delayed tasks that are due to be executed - val delayed = _delayed.value - if (delayed != null && !delayed.isEmpty) { - val now = nanoTime() - while (true) { - // make sure that moving from delayed to queue removes from delayed only after it is added to queue - // to make sure that 'isEmpty' and `nextTime` that check both of them - // do not transiently report that both delayed and queue are empty during move - delayed.removeFirstIf { - if (it.timeToExecute(now)) { - enqueueImpl(it) - } else - false - } ?: break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete" - } - } + enqueueDelayedTasks() // then process one event from queue val task = dequeue() if (task != null) { @@ -283,6 +269,8 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay { final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block) open fun enqueue(task: Runnable) { + // are there some delayed tasks that should execute before this one? If so, move them to the queue first. + enqueueDelayedTasks() if (enqueueImpl(task)) { // todo: we should unpark only when this delayed task became first in the queue unpark() @@ -336,6 +324,25 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay { } } + /** Move all delayed tasks that are due to the main queue. */ + private fun enqueueDelayedTasks() { + val delayed = _delayed.value + if (delayed != null && !delayed.isEmpty) { + val now = nanoTime() + while (true) { + // make sure that moving from delayed to queue removes from delayed only after it is added to queue + // to make sure that 'isEmpty' and `nextTime` that check both of them + // do not transiently report that both delayed and queue are empty during move + delayed.removeFirstIf { + if (it.timeToExecute(now)) { + enqueueImpl(it) + } else + false + } ?: break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete" + } + } + } + private fun closeQueue() { assert { isCompleted } _queue.loop { queue -> diff --git a/kotlinx-coroutines-core/jvm/test/EventLoopsTest.kt b/kotlinx-coroutines-core/jvm/test/EventLoopsTest.kt index 9ab52fd220..551d1977c0 100644 --- a/kotlinx-coroutines-core/jvm/test/EventLoopsTest.kt +++ b/kotlinx-coroutines-core/jvm/test/EventLoopsTest.kt @@ -126,6 +126,21 @@ class EventLoopsTest : TestBase() { finish(4) } + /** + * Tests that, when delayed tasks are due on an event loop, they will execute earlier than the newly-scheduled + * non-delayed tasks. + */ + @Test + fun testPendingDelayedBeingDueEarlier() = runTest { + launch(start = CoroutineStart.UNDISPATCHED) { + delay(1) + expect(1) + } + Thread.sleep(100) + yield() + finish(2) + } + class EventSync { private val waitingThread = atomic(null) private val fired = atomic(false)