Skip to content

Commit

Permalink
Remove various obsolete code (#4196)
Browse files Browse the repository at this point in the history
Removed some code that's no longer relevant:

* Backward compatibility for Ktor 1.0.0,
* Some code that was more complex than needed to
  support that,
* A JDK 6 workaround,
* Other small things.
  • Loading branch information
dkhalanskyjb authored Aug 12, 2024
1 parent f0bdf00 commit b286646
Show file tree
Hide file tree
Showing 18 changed files with 81 additions and 388 deletions.
17 changes: 0 additions & 17 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -1258,23 +1258,6 @@ public final class kotlinx/coroutines/intrinsics/CancellableKt {
public static final fun startCoroutineCancellable (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)V
}

public class kotlinx/coroutines/scheduling/ExperimentalCoroutineDispatcher : kotlinx/coroutines/ExecutorCoroutineDispatcher {
public synthetic fun <init> (II)V
public synthetic fun <init> (IIILkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun <init> (IIJLjava/lang/String;)V
public synthetic fun <init> (IIJLjava/lang/String;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun <init> (IILjava/lang/String;)V
public synthetic fun <init> (IILjava/lang/String;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun blocking (I)Lkotlinx/coroutines/CoroutineDispatcher;
public static synthetic fun blocking$default (Lkotlinx/coroutines/scheduling/ExperimentalCoroutineDispatcher;IILjava/lang/Object;)Lkotlinx/coroutines/CoroutineDispatcher;
public fun close ()V
public fun dispatch (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V
public fun dispatchYield (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V
public fun getExecutor ()Ljava/util/concurrent/Executor;
public final fun limited (I)Lkotlinx/coroutines/CoroutineDispatcher;
public fun toString ()Ljava/lang/String;
}

public final class kotlinx/coroutines/selects/OnTimeoutKt {
public static final fun onTimeout (Lkotlinx/coroutines/selects/SelectBuilder;JLkotlin/jvm/functions/Function1;)V
public static final fun onTimeout-8Mi8wO0 (Lkotlinx/coroutines/selects/SelectBuilder;JLkotlin/jvm/functions/Function1;)V
Expand Down
21 changes: 13 additions & 8 deletions kotlinx-coroutines-core/common/src/SchedulerTask.common.kt
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package kotlinx.coroutines

/**
* A [Runnable] that's especially optimized for running in [Dispatchers.Default] on the JVM.
*
* Replacing a [SchedulerTask] with a [Runnable] should not lead to any change in observable behavior.
*
* An arbitrary [Runnable], once it is dispatched by [Dispatchers.Default], gets wrapped into a class that
* stores the submission time, the execution context, etc.
* For [Runnable] instances that we know are only going to be executed in dispatch procedures, we can avoid the
* overhead of separately allocating a wrapper, and instead have the [Runnable] contain the required fields
* on construction.
*
* When running outside the standard dispatchers, these new fields are just dead weight.
*/
internal expect abstract class SchedulerTask internal constructor() : Runnable

internal expect interface SchedulerTaskContext

@Suppress("EXTENSION_SHADOWED_BY_MEMBER")
internal expect val SchedulerTask.taskContext: SchedulerTaskContext

@Suppress("EXTENSION_SHADOWED_BY_MEMBER")
internal expect inline fun SchedulerTaskContext.afterTask()
16 changes: 4 additions & 12 deletions kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ internal abstract class DispatchedTask<in T> internal constructor(

final override fun run() {
assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching
val taskContext = this.taskContext
var fatalException: Throwable? = null
try {
val delegate = delegate as DispatchedContinuation<T>
Expand Down Expand Up @@ -107,8 +106,7 @@ internal abstract class DispatchedTask<in T> internal constructor(
// This instead of runCatching to have nicer stacktrace and debug experience
fatalException = e
} finally {
val result = runCatching { taskContext.afterTask() }
handleFatalException(fatalException, result.exceptionOrNull())
fatalException?.let { handleFatalException(it) }
}
}

Expand All @@ -130,15 +128,9 @@ internal abstract class DispatchedTask<in T> internal constructor(
* Fatal exception handling can be intercepted with [CoroutineExceptionHandler] element in the context of
* a failed coroutine, but such exceptions should be reported anyway.
*/
internal fun handleFatalException(exception: Throwable?, finallyException: Throwable?) {
if (exception === null && finallyException === null) return
if (exception !== null && finallyException !== null) {
exception.addSuppressed(finallyException)
}

val cause = exception ?: finallyException
internal fun handleFatalException(exception: Throwable) {
val reason = CoroutinesInternalError("Fatal exception in coroutines machinery for $this. " +
"Please read KDoc to 'handleFatalException' method and report this incident to maintainers", cause!!)
"Please read KDoc to 'handleFatalException' method and report this incident to maintainers", exception)
handleCoroutineException(this.delegate.context, reason)
}
}
Expand Down Expand Up @@ -203,7 +195,7 @@ internal inline fun DispatchedTask<*>.runUnconfinedEventLoop(
* This exception doesn't happen normally, only if we have a bug in implementation.
* Report it as a fatal exception.
*/
handleFatalException(e, null)
handleFatalException(e)
} finally {
eventLoop.decrementUseCount(unconfined = true)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@ package kotlinx.coroutines.internal
* where atomicfu doesn't support its tranformations.
*
* Have `Local` prefix to avoid AFU clashes during star-imports
*
* TODO: remove after https://youtrack.jetbrains.com/issue/KT-62423/
*/
internal expect class LocalAtomicInt(value: Int) {
fun get(): Int
fun set(value: Int)
fun decrementAndGet(): Int
}

internal inline var LocalAtomicInt.value
get() = get()
set(value) = set(value)
10 changes: 0 additions & 10 deletions kotlinx-coroutines-core/jsAndWasmShared/src/SchedulerTask.kt
Original file line number Diff line number Diff line change
@@ -1,13 +1,3 @@
package kotlinx.coroutines

internal actual abstract class SchedulerTask : Runnable

internal actual interface SchedulerTaskContext { }

private object TaskContext: SchedulerTaskContext { }

internal actual val SchedulerTask.taskContext: SchedulerTaskContext get() = TaskContext

@Suppress("NOTHING_TO_INLINE")
internal actual inline fun SchedulerTaskContext.afterTask() {}

11 changes: 5 additions & 6 deletions kotlinx-coroutines-core/jvm/src/Executors.kt
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,12 @@ private class DispatcherExecutor(@JvmField val dispatcher: CoroutineDispatcher)

internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcher(), Delay {

/*
* Attempts to reflectively (to be Java 6 compatible) invoke
* ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy in order to cleanup
* internal scheduler queue on cancellation.
*/
init {
removeFutureOnCancel(executor)
/* Attempt to invoke ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy in order to clean up
* the internal scheduler queue on cancellation. */
if (executor is ScheduledThreadPoolExecutor) {
executor.removeOnCancelPolicy = true
}
}

override fun dispatch(context: CoroutineContext, block: Runnable) {
Expand Down
9 changes: 0 additions & 9 deletions kotlinx-coroutines-core/jvm/src/SchedulerTask.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,3 @@ package kotlinx.coroutines
import kotlinx.coroutines.scheduling.*

internal actual typealias SchedulerTask = Task

internal actual typealias SchedulerTaskContext = TaskContext

@Suppress("EXTENSION_SHADOWED_BY_MEMBER")
internal actual val SchedulerTask.taskContext: SchedulerTaskContext get() = taskContext

@Suppress("NOTHING_TO_INLINE", "EXTENSION_SHADOWED_BY_MEMBER")
internal actual inline fun SchedulerTaskContext.afterTask() =
afterTask()
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package kotlinx.coroutines.flow.internal

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

internal actual class AbortFlowException actual constructor(
@JvmField @Transient actual val owner: Any
Expand Down
19 changes: 0 additions & 19 deletions kotlinx-coroutines-core/jvm/src/internal/Concurrent.kt
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package kotlinx.coroutines.internal

import java.lang.reflect.*
import java.util.*
import java.util.concurrent.*
import kotlin.concurrent.withLock as withLockJvm

@Suppress("ACTUAL_WITHOUT_EXPECT")
Expand All @@ -22,20 +20,3 @@ internal actual annotation class BenignDataRace()
@Suppress("NOTHING_TO_INLINE") // So that R8 can completely remove ConcurrentKt class
internal actual inline fun <E> identitySet(expectedSize: Int): MutableSet<E> =
Collections.newSetFromMap(IdentityHashMap(expectedSize))

private val REMOVE_FUTURE_ON_CANCEL: Method? = try {
ScheduledThreadPoolExecutor::class.java.getMethod("setRemoveOnCancelPolicy", Boolean::class.java)
} catch (e: Throwable) {
null
}

@Suppress("NAME_SHADOWING")
internal fun removeFutureOnCancel(executor: Executor): Boolean {
try {
val executor = executor as? ScheduledThreadPoolExecutor ?: return false
(REMOVE_FUTURE_ON_CANCEL ?: return false).invoke(executor, true)
return true
} catch (e: Throwable) {
return false // failed to setRemoveOnCancelPolicy, assume it does not removes future on cancel
}
}
8 changes: 4 additions & 4 deletions kotlinx-coroutines-core/jvm/src/internal/FastServiceLoader.kt
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package kotlinx.coroutines.internal

import kotlinx.coroutines.CoroutineExceptionHandler
import java.io.*
import java.net.*
import java.util.*
import java.util.jar.*
import java.util.zip.*
import kotlin.collections.ArrayList

/**
* Don't use JvmField here to enable R8 optimizations via "assumenosideeffects"
Expand Down Expand Up @@ -68,7 +68,7 @@ internal object FastServiceLoader {
// Also search for test-module factory
createInstanceOf(clz, "kotlinx.coroutines.test.internal.TestMainDispatcherFactory")?.apply { result.add(this) }
result
} catch (e: Throwable) {
} catch (_: Throwable) {
// Fallback to the regular SL in case of any unexpected exception
load(clz, clz.classLoader)
}
Expand All @@ -85,15 +85,15 @@ internal object FastServiceLoader {
return try {
val clz = Class.forName(serviceClass, true, baseClass.classLoader)
baseClass.cast(clz.getDeclaredConstructor().newInstance())
} catch (e: ClassNotFoundException) { // Do not fail if TestMainDispatcherFactory is not found
} catch (_: ClassNotFoundException) { // Do not fail if TestMainDispatcherFactory is not found
null
}
}

private fun <S> load(service: Class<S>, loader: ClassLoader): List<S> {
return try {
loadProviders(service, loader)
} catch (e: Throwable) {
} catch (_: Throwable) {
// Fallback to default service loader
ServiceLoader.load(service, loader).toList()
}
Expand Down
60 changes: 24 additions & 36 deletions kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ import kotlin.math.*
*
* ### Support for blocking tasks
*
* The scheduler also supports the notion of [blocking][TASK_PROBABLY_BLOCKING] tasks.
* The scheduler also supports the notion of [blocking][Task.isBlocking] tasks.
* When executing or enqueuing blocking tasks, the scheduler notifies or creates an additional worker in
* addition to the core pool size, so at any given moment, it has [corePoolSize] threads (potentially not yet created)
* available to serve CPU-bound tasks. To properly guarantee liveness, the scheduler maintains
Expand Down Expand Up @@ -425,7 +425,7 @@ internal class CoroutineScheduler(
block.taskContext = taskContext
return block
}
return TaskImpl(block, nanoTime, taskContext)
return block.asTask(nanoTime, taskContext)
}

// NB: should only be called from 'dispatch' method due to blocking tasks increment
Expand Down Expand Up @@ -514,7 +514,7 @@ internal class CoroutineScheduler(
*/
if (state === WorkerState.TERMINATED) return task
// Do not add CPU tasks in local queue if we are not able to execute it
if (task.mode == TASK_NON_BLOCKING && state === WorkerState.BLOCKING) {
if (!task.isBlocking && state === WorkerState.BLOCKING) {
return task
}
mayHaveLocalTasks = true
Expand Down Expand Up @@ -810,29 +810,26 @@ internal class CoroutineScheduler(
private fun inStack(): Boolean = nextParkedWorker !== NOT_IN_STACK

private fun executeTask(task: Task) {
val taskMode = task.mode
idleReset(taskMode)
beforeTask(taskMode)
runSafely(task)
afterTask(taskMode)
}

private fun beforeTask(taskMode: Int) {
if (taskMode == TASK_NON_BLOCKING) return
// Always notify about new work when releasing CPU-permit to execute some blocking task
if (tryReleaseCpu(WorkerState.BLOCKING)) {
signalCpuWork()
terminationDeadline = 0L // reset deadline for termination
if (state == WorkerState.PARKING) {
assert { task.isBlocking }
state = WorkerState.BLOCKING
}
}

private fun afterTask(taskMode: Int) {
if (taskMode == TASK_NON_BLOCKING) return
decrementBlockingTasks()
val currentState = state
// Shutdown sequence of blocking dispatcher
if (currentState !== WorkerState.TERMINATED) {
assert { currentState == WorkerState.BLOCKING } // "Expected BLOCKING state, but has $currentState"
state = WorkerState.DORMANT
if (task.isBlocking) {
// Always notify about new work when releasing CPU-permit to execute some blocking task
if (tryReleaseCpu(WorkerState.BLOCKING)) {
signalCpuWork()
}
runSafely(task)
decrementBlockingTasks()
val currentState = state
// Shutdown sequence of blocking dispatcher
if (currentState !== WorkerState.TERMINATED) {
assert { currentState == WorkerState.BLOCKING } // "Expected BLOCKING state, but has $currentState"
state = WorkerState.DORMANT
}
} else {
runSafely(task)
}
}

Expand Down Expand Up @@ -923,15 +920,6 @@ internal class CoroutineScheduler(
state = WorkerState.TERMINATED
}

// It is invoked by this worker when it finds a task
private fun idleReset(mode: Int) {
terminationDeadline = 0L // reset deadline for termination
if (state == WorkerState.PARKING) {
assert { mode == TASK_PROBABLY_BLOCKING }
state = WorkerState.BLOCKING
}
}

fun findTask(mayHaveLocalTasks: Boolean): Task? {
if (tryAcquireCpuPermit()) return findAnyTask(mayHaveLocalTasks)
/*
Expand Down Expand Up @@ -1013,12 +1001,12 @@ internal class CoroutineScheduler(

enum class WorkerState {
/**
* Has CPU token and either executes [TASK_NON_BLOCKING] task or tries to find one.
* Has CPU token and either executes a [Task.isBlocking]` == false` task or tries to find one.
*/
CPU_ACQUIRED,

/**
* Executing task with [TASK_PROBABLY_BLOCKING].
* Executing task with [Task.isBlocking].
*/
BLOCKING,

Expand Down
Loading

0 comments on commit b286646

Please sign in to comment.