diff --git a/kotlinx-coroutines-core/common/src/JobSupport.kt b/kotlinx-coroutines-core/common/src/JobSupport.kt index 5fda74fc88..3252d1047f 100644 --- a/kotlinx-coroutines-core/common/src/JobSupport.kt +++ b/kotlinx-coroutines-core/common/src/JobSupport.kt @@ -159,12 +159,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren * If final state of the job is [Incomplete], then it is boxed into [IncompleteStateBox] * and should be [unboxed][unboxState] before returning to user code. */ - internal val state: Any? get() { - _state.loop { state -> // helper loop on state (complete in-progress atomic operations) - if (state !is OpDescriptor) return state - state.perform(this) - } - } + internal val state: Any? get() = _state.value /** * @suppress **This is unstable API and it is subject to change.** @@ -324,6 +319,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren private fun notifyCancelling(list: NodeList, cause: Throwable) { // first cancel our own children onCancelling(cause) + list.close(LIST_CANCELLATION_PERMISSION) notifyHandlers(list, cause) { it.onCancelling } // then cancel parent cancelParent(cause) // tentative cancellation -- does not matter if there is no parent @@ -355,8 +351,10 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren return parent.childCancelled(cause) || isCancellation } - private fun NodeList.notifyCompletion(cause: Throwable?) = + private fun NodeList.notifyCompletion(cause: Throwable?) { + close(LIST_ON_COMPLETION_PERMISSION) notifyHandlers(this, cause) { true } + } private inline fun notifyHandlers(list: NodeList, cause: Throwable?, predicate: (JobNode) -> Boolean) { var exception: Throwable? = null @@ -464,60 +462,94 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren node: JobNode ): DisposableHandle { node.job = this + // Create node upfront -- for common cases it just initializes JobNode.job field, + // for user-defined handlers it allocates a JobNode object that we might not need, but this is Ok. + val added = tryPutNodeIntoList(node) { state, list -> + if (node.onCancelling) { + /** + * We are querying whether the job was already cancelled when we entered this block. + * We can't naively attempt to add the node to the list, because a lot of time could pass between + * notifying the cancellation handlers (and thus closing the list, forcing us to retry) + * and reaching a final state. + * + * Alternatively, we could also try to add the node to the list first and then read the latest state + * to check for an exception, but that logic would need to manually handle the final state, which is + * less straightforward. + */ + val rootCause = (state as? Finishing)?.rootCause + if (rootCause == null) { + /** + * There is no known root cause yet, so we can add the node to the list of state handlers. + * + * If this call fails, because of the bitmask, this means one of the two happened: + * - [notifyCancelling] was already called. + * This means that the job is already being cancelled: otherwise, with what exception would we + * notify the handler? + * So, we can retry the operation: either the state is already final, or the `rootCause` check + * above will give a different result. + * - [notifyCompletion] was already called. + * This means that the job is already complete. + * We can retry the operation and will observe the final state. + */ + list.addLast(node, LIST_CANCELLATION_PERMISSION or LIST_ON_COMPLETION_PERMISSION) + } else { + /** + * The root cause is known, so we can invoke the handler immediately and avoid adding it. + */ + if (invokeImmediately) node.invoke(rootCause) + return NonDisposableHandle + } + } else { + /** + * The non-[onCancelling]-handlers are interested in completions only, so it's safe to add them at + * any time before [notifyCompletion] is called (which closes the list). + * + * If the list *is* closed, on a retry, we'll observe the final state, as [notifyCompletion] is only + * called after the state transition. + */ + list.addLast(node, LIST_ON_COMPLETION_PERMISSION) + } + } + when { + added -> return node + invokeImmediately -> node.invoke((state as? CompletedExceptionally)?.cause) + } + return NonDisposableHandle + } + + /** + * Puts [node] into the current state's list of completion handlers. + * + * Returns `false` if the state is already complete and doesn't accept new handlers. + * Returns `true` if the handler was successfully added to the list. + * + * [tryAdd] is invoked when the state is [Incomplete] and the list is not `null`, to decide on the specific + * behavior in this case. It must return + * - `true` if the element was successfully added to the list + * - `false` if the operation needs to be retried + */ + private inline fun tryPutNodeIntoList( + node: JobNode, + tryAdd: (Incomplete, NodeList) -> Boolean + ): Boolean { loopOnState { state -> when (state) { is Empty -> { // EMPTY_X state -- no completion handlers if (state.isActive) { - // try move to SINGLE state - if (_state.compareAndSet(state, node)) return node + // try to move to the SINGLE state + if (_state.compareAndSet(state, node)) return true } else promoteEmptyToNodeList(state) // that way we can add listener for non-active coroutine } - is Incomplete -> { - val list = state.list - if (list == null) { // SINGLE/SINGLE+ - promoteSingleToNodeList(state as JobNode) - } else { - var rootCause: Throwable? = null - var handle: DisposableHandle = NonDisposableHandle - if (node.onCancelling && state is Finishing) { - synchronized(state) { - // check if we are installing cancellation handler on job that is being cancelled - rootCause = state.rootCause // != null if cancelling job - // We add node to the list in two cases --- either the job is not being cancelled - // or we are adding a child to a coroutine that is not completing yet - if (rootCause == null || node is ChildHandleNode && !state.isCompleting) { - // Note: add node the list while holding lock on state (make sure it cannot change) - if (!addLastAtomic(state, list, node)) return@loopOnState // retry - // just return node if we don't have to invoke handler (not cancelling yet) - if (rootCause == null) return node - // otherwise handler is invoked immediately out of the synchronized section & handle returned - handle = node - } - } - } - if (rootCause != null) { - // Note: attachChild uses invokeImmediately, so it gets invoked when adding to cancelled job - if (invokeImmediately) node.invoke(rootCause) - return handle - } else { - if (addLastAtomic(state, list, node)) return node - } - } - } - else -> { // is complete - // :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension, - // because we play type tricks on Kotlin/JS and handler is not necessarily a function there - if (invokeImmediately) node.invoke((state as? CompletedExceptionally)?.cause) - return NonDisposableHandle + is Incomplete -> when (val list = state.list) { + null -> promoteSingleToNodeList(state as JobNode) + else -> if (tryAdd(state, list)) return true } + else -> return false } } } - private fun addLastAtomic(expect: Any, list: NodeList, node: JobNode) = - list.addLastIf(node) { this.state === expect } - private fun promoteEmptyToNodeList(state: Empty) { // try to promote it to LIST state with the corresponding state val list = NodeList() @@ -874,13 +906,13 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren // atomically transition to finishing & completing state val finishing = state as? Finishing ?: Finishing(list, false, null) // must synchronize updates to finishing state - var notifyRootCause: Throwable? = null + val notifyRootCause: Throwable? synchronized(finishing) { // check if this state is already completing if (finishing.isCompleting) return COMPLETING_ALREADY // mark as completing finishing.isCompleting = true - // if we need to promote to finishing then atomically do it here. + // if we need to promote to finishing, then atomically do it here. // We do it as early is possible while still holding the lock. This ensures that we cancelImpl asap // (if somebody else is faster) and we synchronize all the threads on this finishing lock asap. if (finishing !== state) { @@ -894,12 +926,21 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren // If it just becomes cancelling --> must process cancelling notifications notifyRootCause = finishing.rootCause.takeIf { !wasCancelling } } - // process cancelling notification here -- it cancels all the children _before_ we start to to wait them (sic!!!) + // process cancelling notification here -- it cancels all the children _before_ we start to wait them (sic!!!) notifyRootCause?.let { notifyCancelling(list, it) } // now wait for children - val child = firstChild(state) + // we can't close the list yet: while there are active children, adding new ones is still allowed. + val child = list.nextChild() if (child != null && tryWaitForChild(finishing, child, proposedUpdate)) return COMPLETING_WAITING_CHILDREN + // turns out, there are no children to await, so we close the list. + list.close(LIST_CHILD_PERMISSION) + // some children could have sneaked into the list, so we try waiting for them again. + // it would be more correct to re-open the list (otherwise, we get non-linearizable behavior), + // but it's too difficult with the current lock-free list implementation. + val anotherChild = list.nextChild() + if (anotherChild != null && tryWaitForChild(finishing, anotherChild, proposedUpdate)) + return COMPLETING_WAITING_CHILDREN // otherwise -- we have not children left (all were already cancelled?) return finalizeFinishingState(finishing, proposedUpdate) } @@ -907,9 +948,6 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren private val Any?.exceptionOrNull: Throwable? get() = (this as? CompletedExceptionally)?.cause - private fun firstChild(state: Incomplete) = - state as? ChildHandleNode ?: state.list?.nextChild() - // return false when there is no more incomplete children to wait // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method. private tailrec fun tryWaitForChild(state: Finishing, child: ChildHandleNode, proposedUpdate: Any?): Boolean { @@ -925,11 +963,25 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method. private fun continueCompleting(state: Finishing, lastChild: ChildHandleNode, proposedUpdate: Any?) { assert { this.state === state } // consistency check -- it cannot change while we are waiting for children - // figure out if we need to wait for next child + // figure out if we need to wait for the next child val waitChild = lastChild.nextChild() - // try wait for next child + // try to wait for the next child if (waitChild != null && tryWaitForChild(state, waitChild, proposedUpdate)) return // waiting for next child - // no more children to wait -- try update state + // no more children to await, so *maybe* we can complete the job; for that, we stop accepting new children. + // potentially, the list can be closed for children more than once: if we detect that there are no more + // children, attempt to close the list, and then new children sneak in, this whole logic will be + // repeated, including closing the list. + state.list.close(LIST_CHILD_PERMISSION) + // did any new children sneak in? + val waitChildAgain = lastChild.nextChild() + if (waitChildAgain != null && tryWaitForChild(state, waitChildAgain, proposedUpdate)) { + // yes, so now we have to wait for them! + // ideally, we should re-open the list, + // but it's too difficult with the current lock-free list implementation, + // so we'll live with non-linearizable behavior for now. + return + } + // no more children, now we are sure; try to update the state val finalState = finalizeFinishingState(state, proposedUpdate) afterCompletion(finalState) } @@ -958,14 +1010,75 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren public final override fun attachChild(child: ChildJob): ChildHandle { /* * Note: This function attaches a special ChildHandleNode node object. This node object - * is handled in a special way on completion on the coroutine (we wait for all of them) and - * is handled specially by invokeOnCompletion itself -- it adds this node to the list even - * if the job is already cancelling. For cancelling state child is attached under state lock. - * It's required to properly wait all children before completion and provide linearizable hierarchy view: - * If child is attached when the job is already being cancelled, such child will receive immediate notification on - * cancellation, but parent *will* wait for that child before completion and will handle its exception. + * is handled in a special way on completion on the coroutine (we wait for all of them) and also + * can't be added simply with `invokeOnCompletionInternal` -- we add this node to the list even + * if the job is already cancelling. + * It's required to properly await all children before completion and provide a linearizable hierarchy view: + * If the child is attached when the job is already being cancelled, such a child will receive + * an immediate notification on cancellation, + * but the parent *will* wait for that child before completion and will handle its exception. */ - return invokeOnCompletion(handler = ChildHandleNode(child)) as ChildHandle + val node = ChildHandleNode(child).also { it.job = this } + val added = tryPutNodeIntoList(node) { _, list -> + // First, try to add a child along the cancellation handlers + val addedBeforeCancellation = list.addLast( + node, + LIST_ON_COMPLETION_PERMISSION or LIST_CHILD_PERMISSION or LIST_CANCELLATION_PERMISSION + ) + if (addedBeforeCancellation) { + // The child managed to be added before the parent started to cancel or complete. Success. + true + } else { + /* Either cancellation or completion already happened, the child was not added. + * Now we need to try adding it just for completion. */ + val addedBeforeCompletion = list.addLast( + node, + LIST_CHILD_PERMISSION or LIST_ON_COMPLETION_PERMISSION + ) + /* + * Whether or not we managed to add the child before the parent completed, we need to investigate: + * why didn't we manage to add it before cancellation? + * If it's because cancellation happened in the meantime, we need to notify the child about it. + * We check the latest state because the original state with which we started may not have had + * the information about the cancellation yet. + */ + val rootCause = when (val latestState = this.state) { + is Finishing -> { + // The state is still incomplete, so we need to notify the child about the completion cause. + latestState.rootCause + } + else -> { + /** Since the list is already closed for [onCancelling], the job is either Finishing or + * already completed. We need to notify the child about the completion cause. */ + assert { latestState !is Incomplete } + (latestState as? CompletedExceptionally)?.cause + } + } + /** + * We must cancel the child if the parent was cancelled already, even if we successfully attached, + * as this child didn't make it before [notifyCancelling] and won't be notified that it should be + * cancelled. + * + * And if the parent wasn't cancelled and the previous [LockFreeLinkedListNode.addLast] failed because + * the job is in its final state already, we won't be able to attach anyway, so we must just invoke + * the handler and return. + */ + node.invoke(rootCause) + if (addedBeforeCompletion) { + /** The root cause can't be null: since the earlier addition to the list failed, this means that + * the job was already cancelled or completed. */ + assert { rootCause != null } + true + } else { + /** No sense in retrying: we know it won't succeed, and we already invoked the handler. */ + return NonDisposableHandle + } + } + } + if (added) return node + /** We can only end up here if [tryPutNodeIntoList] detected a final state. */ + node.invoke((state as? CompletedExceptionally)?.cause) + return NonDisposableHandle } /** @@ -1296,6 +1409,11 @@ private val SEALED = Symbol("SEALED") private val EMPTY_NEW = Empty(false) private val EMPTY_ACTIVE = Empty(true) +// bit mask +private const val LIST_ON_COMPLETION_PERMISSION = 1 +private const val LIST_CHILD_PERMISSION = 2 +private const val LIST_CANCELLATION_PERMISSION = 4 + private class Empty(override val isActive: Boolean) : Incomplete { override val list: NodeList? get() = null override fun toString(): String = "Empty{${if (isActive) "Active" else "New" }}" diff --git a/kotlinx-coroutines-core/common/src/internal/Atomic.kt b/kotlinx-coroutines-core/common/src/internal/Atomic.kt deleted file mode 100644 index eddddc72f1..0000000000 --- a/kotlinx-coroutines-core/common/src/internal/Atomic.kt +++ /dev/null @@ -1,67 +0,0 @@ -@file:Suppress("NO_EXPLICIT_VISIBILITY_IN_API_MODE") - -package kotlinx.coroutines.internal - -import kotlinx.atomicfu.atomic -import kotlinx.coroutines.* -import kotlin.jvm.* - -/** - * The most abstract operation that can be in process. Other threads observing an instance of this - * class in the fields of their object shall invoke [perform] to help. - * - * @suppress **This is unstable API and it is subject to change.** - */ -public abstract class OpDescriptor { - /** - * Returns `null` is operation was performed successfully or some other - * object that indicates the failure reason. - */ - abstract fun perform(affected: Any?): Any? - - override fun toString(): String = "$classSimpleName@$hexAddress" // debug -} - -@JvmField -internal val NO_DECISION: Any = Symbol("NO_DECISION") - -/** - * Descriptor for multi-word atomic operation. - * Based on paper - * ["A Practical Multi-Word Compare-and-Swap Operation"](https://www.cl.cam.ac.uk/research/srg/netos/papers/2002-casn.pdf) - * by Timothy L. Harris, Keir Fraser and Ian A. Pratt. - * - * Note: parts of atomic operation must be globally ordered. Otherwise, this implementation will produce - * `StackOverflowError`. - * - * @suppress **This is unstable API and it is subject to change.** - */ -@InternalCoroutinesApi -public abstract class AtomicOp : OpDescriptor() { - private val _consensus = atomic(NO_DECISION) - - private fun decide(decision: Any?): Any? { - assert { decision !== NO_DECISION } - val current = _consensus.value - if (current !== NO_DECISION) return current - if (_consensus.compareAndSet(NO_DECISION, decision)) return decision - return _consensus.value - } - - abstract fun prepare(affected: T): Any? // `null` if Ok, or failure reason - - abstract fun complete(affected: T, failure: Any?) // failure != null if failed to prepare op - - // returns `null` on success - @Suppress("UNCHECKED_CAST") - final override fun perform(affected: Any?): Any? { - // make decision on status - var decision = this._consensus.value - if (decision === NO_DECISION) { - decision = decide(prepare(affected as T)) - } - // complete operation - complete(affected as T, decision) - return decision - } -} diff --git a/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt b/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt index 7f9ee666ab..32209fc0ac 100644 --- a/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt +++ b/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt @@ -2,22 +2,22 @@ package kotlinx.coroutines.internal -import kotlinx.coroutines.* -import kotlin.jvm.* - /** @suppress **This is unstable API and it is subject to change.** */ public expect open class LockFreeLinkedListNode() { public val isRemoved: Boolean public val nextNode: LockFreeLinkedListNode public val prevNode: LockFreeLinkedListNode + public fun addLast(node: LockFreeLinkedListNode, permissionsBitmask: Int): Boolean public fun addOneIfEmpty(node: LockFreeLinkedListNode): Boolean - public inline fun addLastIf(node: LockFreeLinkedListNode, crossinline condition: () -> Boolean): Boolean public open fun remove(): Boolean + /** + * Closes the list for anything that requests the permission [forbiddenElementsBit]. + * Only a single permission can be forbidden at a time, but this isn't checked. + */ + public fun close(forbiddenElementsBit: Int) } -internal fun LockFreeLinkedListNode.addLast(node: LockFreeLinkedListNode) = addLastIf(node) { true } - /** @suppress **This is unstable API and it is subject to change.** */ public expect open class LockFreeLinkedListHead() : LockFreeLinkedListNode { public inline fun forEach(block: (LockFreeLinkedListNode) -> Unit) diff --git a/kotlinx-coroutines-core/common/test/JobTest.kt b/kotlinx-coroutines-core/common/test/JobTest.kt index b86ac73138..55119ab65c 100644 --- a/kotlinx-coroutines-core/common/test/JobTest.kt +++ b/kotlinx-coroutines-core/common/test/JobTest.kt @@ -174,6 +174,20 @@ class JobTest : TestBase() { finish(4) } + @Test + fun testInvokeOnCancellingFiringOnNormalExit() = runTest { + val job = launch { + expect(2) + } + job.invokeOnCompletion(onCancelling = true) { + assertNull(it) + expect(3) + } + expect(1) + job.join() + finish(4) + } + @Test fun testOverriddenParent() = runTest { val parent = Job() diff --git a/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt b/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt index b2d069e6aa..a172e66c8b 100644 --- a/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt +++ b/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt @@ -8,18 +8,6 @@ import kotlin.jvm.* private typealias Node = LockFreeLinkedListNode -@PublishedApi -internal const val UNDECIDED: Int = 0 - -@PublishedApi -internal const val SUCCESS: Int = 1 - -@PublishedApi -internal const val FAILURE: Int = 2 - -@PublishedApi -internal val CONDITION_FALSE: Any = Symbol("CONDITION_FALSE") - /** * Doubly-linked concurrent list node with remove support. * Based on paper @@ -49,37 +37,10 @@ public actual open class LockFreeLinkedListNode { private fun removed(): Removed = _removedRef.value ?: Removed(this).also { _removedRef.lazySet(it) } - @PublishedApi - internal abstract class CondAddOp( - @JvmField val newNode: Node - ) : AtomicOp() { - @JvmField var oldNext: Node? = null - - override fun complete(affected: Node, failure: Any?) { - val success = failure == null - val update = if (success) newNode else oldNext - if (update != null && affected._next.compareAndSet( this, update)) { - // only the thread the makes this update actually finishes add operation - if (success) newNode.finishAdd(oldNext!!) - } - } - } - - @PublishedApi - internal inline fun makeCondAddOp(node: Node, crossinline condition: () -> Boolean): CondAddOp = - object : CondAddOp(node) { - override fun prepare(affected: Node): Any? = if (condition()) null else CONDITION_FALSE - } - public actual open val isRemoved: Boolean get() = next is Removed // LINEARIZABLE. Returns Node | Removed - public val next: Any get() { - _next.loop { next -> - if (next !is OpDescriptor) return next - next.perform(this) - } - } + public val next: Any get() = _next.value // LINEARIZABLE. Returns next non-removed Node public actual val nextNode: Node get() = @@ -117,20 +78,27 @@ public actual open class LockFreeLinkedListNode { // ------ addLastXXX ------ /** - * Adds last item to this list atomically if the [condition] is true. + * Adds last item to this list. Returns `false` if the list is closed. */ - public actual inline fun addLastIf(node: Node, crossinline condition: () -> Boolean): Boolean { - val condAdd = makeCondAddOp(node, condition) + public actual fun addLast(node: Node, permissionsBitmask: Int): Boolean { while (true) { // lock-free loop on prev.next - val prev = prevNode // sentinel node is never removed, so prev is always defined - when (prev.tryCondAddNext(node, this, condAdd)) { - SUCCESS -> return true - FAILURE -> return false + val currentPrev = prevNode + return when { + currentPrev is ListClosed -> + currentPrev.forbiddenElementsBitmask and permissionsBitmask == 0 && + currentPrev.addLast(node, permissionsBitmask) + currentPrev.addNext(node, this) -> true + else -> continue } } } - // ------ addXXX util ------ + /** + * Forbids adding new items to this list. + */ + public actual fun close(forbiddenElementsBit: Int) { + addLast(ListClosed(forbiddenElementsBit), forbiddenElementsBit) + } /** * Given: @@ -165,17 +133,6 @@ public actual open class LockFreeLinkedListNode { return true } - // returns UNDECIDED, SUCCESS or FAILURE - @PublishedApi - internal fun tryCondAddNext(node: Node, next: Node, condAdd: CondAddOp): Int { - node._prev.lazySet(this) - node._next.lazySet(next) - condAdd.oldNext = next - if (!_next.compareAndSet(next, condAdd)) return UNDECIDED - // added operation successfully (linearized) -- complete it & fixup the list - return if (condAdd.perform(this) == null) SUCCESS else FAILURE - } - // ------ removeXXX ------ /** @@ -273,10 +230,6 @@ public actual open class LockFreeLinkedListNode { } // slow path when we need to help remove operations this.isRemoved -> return null // nothing to do, this node was removed, bail out asap to save time - prevNext is OpDescriptor -> { // help & retry - prevNext.perform(prev) - return correctPrev() // retry from scratch - } prevNext is Removed -> { if (last !== null) { // newly added (prev) node is already removed, correct last.next around it @@ -332,3 +285,5 @@ public actual open class LockFreeLinkedListHead : LockFreeLinkedListNode() { // optimization: because head is never removed, we don't have to read _next.value to check these: override val isRemoved: Boolean get() = false } + +private class ListClosed(@JvmField val forbiddenElementsBitmask: Int): LockFreeLinkedListNode() diff --git a/kotlinx-coroutines-core/jsAndWasmShared/src/internal/LinkedList.kt b/kotlinx-coroutines-core/jsAndWasmShared/src/internal/LinkedList.kt index 6e81c79f40..6810d614d1 100644 --- a/kotlinx-coroutines-core/jsAndWasmShared/src/internal/LinkedList.kt +++ b/kotlinx-coroutines-core/jsAndWasmShared/src/internal/LinkedList.kt @@ -14,13 +14,20 @@ public actual open class LockFreeLinkedListNode { inline actual val prevNode get() = _prev inline actual val isRemoved get() = _removed - @PublishedApi - internal fun addLast(node: Node) { - val prev = this._prev - node._next = this - node._prev = prev - prev._next = node - this._prev = node + public actual fun addLast(node: Node, permissionsBitmask: Int): Boolean = when (val prev = this._prev) { + is ListClosed -> + prev.forbiddenElementsBitmask and permissionsBitmask == 0 && prev.addLast(node, permissionsBitmask) + else -> { + node._next = this + node._prev = prev + prev._next = node + this._prev = node + true + } + } + + public actual fun close(forbiddenElementsBit: Int) { + addLast(ListClosed(forbiddenElementsBit), forbiddenElementsBit) } /* @@ -30,10 +37,6 @@ public actual open class LockFreeLinkedListNode { * invokes handler on remove */ public actual open fun remove(): Boolean { - return removeImpl() - } - - private fun removeImpl(): Boolean { if (_removed) return false val prev = this._prev val next = this._next @@ -45,13 +48,7 @@ public actual open class LockFreeLinkedListNode { public actual fun addOneIfEmpty(node: Node): Boolean { if (_next !== this) return false - addLast(node) - return true - } - - public actual inline fun addLastIf(node: Node, crossinline condition: () -> Boolean): Boolean { - if (!condition()) return false - addLast(node) + addLast(node, Int.MIN_VALUE) return true } } @@ -72,3 +69,5 @@ public actual open class LockFreeLinkedListHead : Node() { // just a defensive programming -- makes sure that list head sentinel is never removed public actual final override fun remove(): Nothing = throw UnsupportedOperationException() } + +private class ListClosed(val forbiddenElementsBitmask: Int): LockFreeLinkedListNode() diff --git a/kotlinx-coroutines-core/jsAndWasmShared/test/internal/LinkedListTest.kt b/kotlinx-coroutines-core/jsAndWasmShared/test/internal/LinkedListTest.kt index d88ddf18f3..305484f741 100644 --- a/kotlinx-coroutines-core/jsAndWasmShared/test/internal/LinkedListTest.kt +++ b/kotlinx-coroutines-core/jsAndWasmShared/test/internal/LinkedListTest.kt @@ -12,13 +12,13 @@ class LinkedListTest { fun testSimpleAddLastRemove() { val list = LockFreeLinkedListHead() assertContents(list) - val n1 = IntNode(1).apply { list.addLast(this) } + val n1 = IntNode(1).apply { list.addLast(this, Int.MAX_VALUE) } assertContents(list, 1) - val n2 = IntNode(2).apply { list.addLast(this) } + val n2 = IntNode(2).apply { list.addLast(this, Int.MAX_VALUE) } assertContents(list, 1, 2) - val n3 = IntNode(3).apply { list.addLast(this) } + val n3 = IntNode(3).apply { list.addLast(this, Int.MAX_VALUE) } assertContents(list, 1, 2, 3) - val n4 = IntNode(4).apply { list.addLast(this) } + val n4 = IntNode(4).apply { list.addLast(this, Int.MAX_VALUE) } assertContents(list, 1, 2, 3, 4) assertTrue(n1.remove()) assertContents(list, 2, 3, 4) diff --git a/kotlinx-coroutines-core/jvm/test/JobChildStressTest.kt b/kotlinx-coroutines-core/jvm/test/JobChildStressTest.kt index 3ac1967b9f..a30e6393b6 100644 --- a/kotlinx-coroutines-core/jvm/test/JobChildStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/JobChildStressTest.kt @@ -1,26 +1,36 @@ package kotlinx.coroutines import kotlinx.coroutines.testing.* -import org.junit.* -import org.junit.Test import java.util.concurrent.* +import java.util.concurrent.atomic.* import kotlin.test.* +/** + * Testing the procedure of attaching a child to the parent job. + */ class JobChildStressTest : TestBase() { private val N_ITERATIONS = 10_000 * stressTestMultiplier private val pool = newFixedThreadPoolContext(3, "JobChildStressTest") - @After + @AfterTest fun tearDown() { pool.close() } /** - * Perform concurrent launch of a child job & cancellation of the explicit parent job + * Tests attaching a child while the parent is trying to finalize its state. + * + * Checks the following interleavings: + * - A child attaches before the parent is cancelled. + * - A child attaches after the parent is cancelled, but before the parent notifies anyone about it. + * - A child attaches after the parent notifies the children about being cancelled, + * but before it starts waiting for its children. + * - A child attempts to attach after the parent stops waiting for its children, + * which immediately cancels the child. */ @Test @Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN") - fun testChild() = runTest { + fun testChildAttachmentRacingWithCancellation() = runTest { val barrier = CyclicBarrier(3) repeat(N_ITERATIONS) { var wasLaunched = false @@ -29,7 +39,7 @@ class JobChildStressTest : TestBase() { unhandledException = ex } val scope = CoroutineScope(pool + handler) - val parent = CompletableDeferred() + val parent = createCompletableDeferredForTesting(it) // concurrent child launcher val launcher = scope.launch { barrier.await() @@ -54,4 +64,44 @@ class JobChildStressTest : TestBase() { } } } + + /** + * Tests attaching a child while the parent is waiting for the last child job to complete. + * + * Checks the following interleavings: + * - A child attaches while the parent is already completing, but is waiting for its children. + * - A child attempts to attach after the parent stops waiting for its children, + * which immediately cancels the child. + */ + @Test + fun testChildAttachmentRacingWithLastChildCompletion() { + // All exceptions should get aggregated here + repeat(N_ITERATIONS) { + runBlocking { + val rogueJob = AtomicReference() + /** not using [createCompletableDeferredForTesting] because we don't need extra children. */ + val deferred = CompletableDeferred() + // optionally, add a completion handler to the parent job, so that the child tries to enter a list with + // multiple elements, not just one. + if (it.mod(2) == 0) { + deferred.invokeOnCompletion { } + } + launch(pool + deferred) { + deferred.complete(Unit) // Transition deferred into "completing" state waiting for current child + // **Asynchronously** submit task that launches a child so it races with completion + pool.executor.execute { + rogueJob.set(launch(pool + deferred) { + throw TestException("isCancelled: ${coroutineContext.job.isCancelled}") + }) + } + } + + deferred.join() + val rogue = rogueJob.get() + if (rogue?.isActive == true) { + throw TestException("Rogue job $rogue with parent " + rogue.parent + " and children list: " + rogue.parent?.children?.toList()) + } + } + } + } } diff --git a/kotlinx-coroutines-core/jvm/test/JobHandlersUpgradeStressTest.kt b/kotlinx-coroutines-core/jvm/test/JobHandlersUpgradeStressTest.kt index dc2314bb6c..3f085b6f20 100644 --- a/kotlinx-coroutines-core/jvm/test/JobHandlersUpgradeStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/JobHandlersUpgradeStressTest.kt @@ -30,6 +30,9 @@ class JobHandlersUpgradeStressTest : TestBase() { val state = atomic(0) } + /** + * Tests handlers not being invoked more than once. + */ @Test fun testStress() { println("--- JobHandlersUpgradeStressTest") @@ -91,4 +94,4 @@ class JobHandlersUpgradeStressTest : TestBase() { println(" Fired handler ${fired.value} times") } -} \ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/JobOnCompletionStressTest.kt b/kotlinx-coroutines-core/jvm/test/JobOnCompletionStressTest.kt new file mode 100644 index 0000000000..3df62b666e --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/JobOnCompletionStressTest.kt @@ -0,0 +1,192 @@ +package kotlinx.coroutines + +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.testing.* +import java.util.concurrent.CyclicBarrier +import java.util.concurrent.atomic.* +import kotlin.test.* +import kotlin.time.Duration.Companion.seconds + +class JobOnCompletionStressTest: TestBase() { + private val N_ITERATIONS = 10_000 * stressTestMultiplier + private val pool = newFixedThreadPoolContext(2, "JobOnCompletionStressTest") + + private val completionHandlerSeesCompletedParent = AtomicBoolean(false) + private val completionHandlerSeesCancelledParent = AtomicBoolean(false) + private val encounteredException = AtomicReference(null) + + @AfterTest + fun tearDown() { + pool.close() + } + + @Test + fun testOnCompletionRacingWithCompletion() = runTest { + testHandlerRacingWithCancellation( + onCancelling = false, + invokeImmediately = true, + parentCompletion = { complete(Unit) } + ) { + assertNull(encounteredException.get()) + assertTrue(completionHandlerSeesCompletedParent.get()) + assertFalse(completionHandlerSeesCancelledParent.get()) + } + } + + @Test + fun testOnCompletionRacingWithCancellation() = runTest { + testHandlerRacingWithCancellation( + onCancelling = false, + invokeImmediately = true, + parentCompletion = { completeExceptionally(TestException()) } + ) { + assertIs(encounteredException.get()) + assertTrue(completionHandlerSeesCompletedParent.get()) + assertTrue(completionHandlerSeesCancelledParent.get()) + } + } + + @Test + fun testOnCancellingRacingWithCompletion() = runTest { + testHandlerRacingWithCancellation( + onCancelling = true, + invokeImmediately = true, + parentCompletion = { complete(Unit) } + ) { + assertNull(encounteredException.get()) + assertTrue(completionHandlerSeesCompletedParent.get()) + assertFalse(completionHandlerSeesCancelledParent.get()) + } + } + + @Test + fun testOnCancellingRacingWithCancellation() = runTest { + testHandlerRacingWithCancellation( + onCancelling = true, + invokeImmediately = true, + parentCompletion = { completeExceptionally(TestException()) } + ) { + assertIs(encounteredException.get()) + assertTrue(completionHandlerSeesCancelledParent.get()) + } + } + + @Test + fun testNonImmediateOnCompletionRacingWithCompletion() = runTest { + testHandlerRacingWithCancellation( + onCancelling = false, + invokeImmediately = false, + parentCompletion = { complete(Unit) } + ) { + assertNull(encounteredException.get()) + assertTrue(completionHandlerSeesCompletedParent.get()) + assertFalse(completionHandlerSeesCancelledParent.get()) + } + } + + @Test + fun testNonImmediateOnCompletionRacingWithCancellation() = runTest { + testHandlerRacingWithCancellation( + onCancelling = false, + invokeImmediately = false, + parentCompletion = { completeExceptionally(TestException()) } + ) { + assertIs(encounteredException.get()) + assertTrue(completionHandlerSeesCompletedParent.get()) + assertTrue(completionHandlerSeesCancelledParent.get()) + } + } + + @Test + fun testNonImmediateOnCancellingRacingWithCompletion() = runTest { + testHandlerRacingWithCancellation( + onCancelling = true, + invokeImmediately = false, + parentCompletion = { complete(Unit) } + ) { + assertNull(encounteredException.get()) + assertTrue(completionHandlerSeesCompletedParent.get()) + assertFalse(completionHandlerSeesCancelledParent.get()) + } + } + + @Test + fun testNonImmediateOnCancellingRacingWithCancellation() = runTest { + testHandlerRacingWithCancellation( + onCancelling = true, + invokeImmediately = false, + parentCompletion = { completeExceptionally(TestException()) } + ) { + assertIs(encounteredException.get()) + assertTrue(completionHandlerSeesCancelledParent.get()) + } + } + + private suspend fun testHandlerRacingWithCancellation( + onCancelling: Boolean, + invokeImmediately: Boolean, + parentCompletion: CompletableDeferred.() -> Unit, + validate: () -> Unit, + ) { + repeat(N_ITERATIONS) { + val entered = Channel(1) + completionHandlerSeesCompletedParent.set(false) + completionHandlerSeesCancelledParent.set(false) + encounteredException.set(null) + val parent = createCompletableDeferredForTesting(it) + val barrier = CyclicBarrier(2) + val handlerInstallJob = coroutineScope { + launch(pool) { + barrier.await() + parent.parentCompletion() + } + async(pool) { + barrier.await() + parent.invokeOnCompletion( + onCancelling = onCancelling, + invokeImmediately = invokeImmediately, + ) { exception -> + encounteredException.set(exception) + completionHandlerSeesCompletedParent.set(parent.isCompleted) + completionHandlerSeesCancelledParent.set(parent.isCancelled) + entered.trySend(Unit) + } + } + } + if (invokeImmediately || handlerInstallJob.getCompleted() !== NonDisposableHandle) { + withTimeout(1.seconds) { + entered.receive() + } + try { + validate() + } catch (e: Throwable) { + println("Iteration $it failed") + println("invokeOnCompletion returned ${handlerInstallJob.getCompleted()}") + throw e + } + } else { + assertTrue(entered.isEmpty) + } + } + } +} + +/** + * Creates a [CompletableDeferred], optionally adding completion handlers and/or other children to the job depending + * on [iteration]. + * The purpose is to test not just attaching completion handlers to empty or one-element lists (see the [JobSupport] + * implementation for details on what this means), but also to lists with multiple elements. + */ +fun createCompletableDeferredForTesting(iteration: Int): CompletableDeferred { + val parent = CompletableDeferred() + /* We optionally add completion handlers and/or other children to the parent job + to test the scenarios where a child is placed into an empty list, a single-element list, + or a list with multiple elements. */ + if (iteration.mod(2) == 0) { + parent.invokeOnCompletion { } + } + if (iteration.mod(3) == 0) { + GlobalScope.launch(parent) { } + } + return parent +} diff --git a/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListLongStressTest.kt b/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListLongStressTest.kt index 21adeb401e..95be1cbe62 100644 --- a/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListLongStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListLongStressTest.kt @@ -1,6 +1,5 @@ package kotlinx.coroutines.internal -import kotlinx.coroutines.testing.* import kotlinx.coroutines.testing.TestBase import org.junit.Test import java.util.* @@ -31,7 +30,7 @@ class LockFreeLinkedListLongStressTest : TestBase() { for (j in 0 until nAddThreads) threads += thread(start = false, name = "adder-$j") { for (i in j until nAdded step nAddThreads) { - list.addLast(IntNode(i)) + list.addLast(IntNode(i), Int.MAX_VALUE) } println("${Thread.currentThread().name} completed") workingAdders.decrementAndGet()