Skip to content

Commit

Permalink
Migrate channels and related operators to common, so channels can be …
Browse files Browse the repository at this point in the history
…used from JS

Fixes #201
  • Loading branch information
qwwdfsad authored and elizarov committed Apr 26, 2018
1 parent f4eb05a commit 9619134
Show file tree
Hide file tree
Showing 50 changed files with 711 additions and 349 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package kotlinx.coroutines.experimental

public expect class CompletionHandlerException(message: String, cause: Throwable) : RuntimeException

public expect open class CancellationException(message: String) : IllegalStateException
public expect open class CancellationException(message: String?) : IllegalStateException

public expect class JobCancellationException(
message: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package kotlinx.coroutines.experimental.channels

import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.internal.*
import kotlinx.coroutines.experimental.internalAnnotations.*
import kotlinx.coroutines.experimental.intrinsics.*
import kotlinx.coroutines.experimental.selects.*
import kotlin.coroutines.experimental.*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@

package kotlinx.coroutines.experimental.channels

import kotlinx.coroutines.experimental.internal.*
import kotlinx.coroutines.experimental.internalAnnotations.*
import kotlinx.coroutines.experimental.selects.*
import java.util.concurrent.*
import java.util.concurrent.locks.*
import kotlin.concurrent.*

/**
* Broadcast channel with array buffer of a fixed [capacity].
Expand Down Expand Up @@ -64,7 +63,7 @@ class ArrayBroadcastChannel<E>(
So read/writes to buffer need not be volatile
*/

private val subs = CopyOnWriteArrayList<Subscriber<E>>()
private val subs = subscriberList<Subscriber<E>>()

override val isBufferAlwaysFull: Boolean get() = false
override val isBufferFull: Boolean get() = size >= capacity
Expand Down Expand Up @@ -132,7 +131,6 @@ class ArrayBroadcastChannel<E>(

// updates head if needed and optionally adds / removes subscriber under the same lock
private tailrec fun updateHead(addSub: Subscriber<E>? = null, removeSub: Subscriber<E>? = null) {
assert(addSub == null || removeSub == null) // only one of them can be specified
// update head in a tail rec loop
var send: Send? = null
var token: Any? = null
Expand Down Expand Up @@ -200,7 +198,8 @@ class ArrayBroadcastChannel<E>(
) : AbstractChannel<E>(), SubscriptionReceiveChannel<E> {
private val subLock = ReentrantLock()

@Volatile @JvmField
@Volatile
@JvmField
var subHead: Long = 0 // guarded by subLock

override val isBufferAlwaysEmpty: Boolean get() = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

package kotlinx.coroutines.experimental.channels

import kotlinx.coroutines.experimental.internal.*
import kotlinx.coroutines.experimental.internalAnnotations.Volatile
import kotlinx.coroutines.experimental.selects.*
import java.util.concurrent.locks.*
import kotlin.concurrent.*

/**
* Channel with array buffer of a fixed [capacity].
Expand Down Expand Up @@ -249,4 +249,4 @@ public open class ArrayChannel<E>(

override val bufferDebugString: String
get() = "(buffer:capacity=${buffer.size},size=$size)"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

package kotlinx.coroutines.experimental.channels

import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED
import java.io.Closeable
import kotlinx.coroutines.experimental.internal.Closeable

/**
* Broadcast channel is a non-blocking primitive for communication between the sender and multiple receivers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,12 @@ internal open class ChannelCoroutine<E>(
val channel: Channel<E>
get() = this

// Workaround for KT-23094
override suspend fun receive(): E = _channel.receive()

override suspend fun send(element: E) = _channel.send(element)

override suspend fun receiveOrNull(): E? = _channel.receiveOrNull()

override fun cancel(cause: Throwable?): Boolean = super.cancel(cause)
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,6 @@ import kotlin.coroutines.experimental.*

internal const val DEFAULT_CLOSE_MESSAGE = "Channel was closed"

// -------- Operations on SendChannel --------

/**
* Adds [element] into to this channel, **blocking** the caller while this channel [Channel.isFull],
* or throws exception if the channel [Channel.isClosedForSend] (see [Channel.close] for details).
*
* This is a way to call [Channel.send] method inside a blocking code using [runBlocking],
* so this function should not be used from coroutine.
*/
public fun <E> SendChannel<E>.sendBlocking(element: E) {
// fast path
if (offer(element))
return
// slow path
runBlocking {
send(element)
}
}

// -------- Conversions to ReceiveChannel --------

Expand Down Expand Up @@ -120,7 +102,7 @@ public fun consumesAll(vararg channels: ReceiveChannel<*>): CompletionHandler =
if (exception == null) {
exception = e
} else {
exception.addSuppressed(e)
exception.addSuppressedThrowable(e)
}
}
exception?.let { throw it }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@

package kotlinx.coroutines.experimental.channels

import kotlinx.atomicfu.atomic
import kotlinx.atomicfu.loop
import kotlinx.coroutines.experimental.internal.Symbol
import kotlinx.coroutines.experimental.intrinsics.startCoroutineUndispatched
import kotlinx.coroutines.experimental.selects.SelectClause2
import kotlinx.coroutines.experimental.selects.SelectInstance
import kotlinx.atomicfu.*
import kotlinx.coroutines.experimental.internal.*
import kotlinx.coroutines.experimental.internalAnnotations.*
import kotlinx.coroutines.experimental.intrinsics.*
import kotlinx.coroutines.experimental.selects.*

/**
* Broadcasts the most recently sent element (aka [value]) to all [openSubscription] subscribers.
Expand Down Expand Up @@ -162,8 +161,8 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
check(i >= 0)
if (n == 1) return null
val update = arrayOfNulls<Subscriber<E>>(n - 1)
System.arraycopy(list, 0, update, 0, i)
System.arraycopy(list, i + 1, update, i, n - i - 1)
arraycopy(list, 0, update, 0, i)
arraycopy(list, i + 1, update, i, n - i - 1)
return update as Array<Subscriber<E>>
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,3 @@ public open class ConflatedChannel<E> : AbstractChannel<E>() {
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package kotlinx.coroutines.experimental.internal

/**
* Cross-platform array copy. Overlaps of source and destination are not supported
*/
expect fun <E> arraycopy(source: Array<E>, srcPos: Int, destination: Array<E?>, destinationStart: Int, length: Int)
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package kotlinx.coroutines.experimental.internal

expect interface Closeable {
fun close()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package kotlinx.coroutines.experimental.internal

/**
* Special kind of list intended to be used as collection of subscribers in [ArrayBroadcastChannel]
* On JVM it's CopyOnWriteList and on JS it's MutableList.
*
* Note that this alias is intentionally not named as CopyOnWriteList to avoid accidental misusage outside of ArrayBroadcastChannel
*/
typealias SubscribersList<E> = MutableList<E>

expect fun <E> subscriberList(): SubscribersList<E>

expect class ReentrantLock() {
fun tryLock(): Boolean
fun unlock(): Unit
}

expect inline fun <T> ReentrantLock.withLock(action: () -> T): T
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@

package kotlinx.coroutines.experimental.internal

import kotlin.jvm.*

/** @suppress **This is unstable API and it is subject to change.** */
public expect open class LockFreeLinkedListNode() {
public val isRemoved: Boolean
public val next: Any
public val nextNode: LockFreeLinkedListNode
public val prev: Any
public val prevNode: LockFreeLinkedListNode
public fun addLast(node: LockFreeLinkedListNode)
public fun addOneIfEmpty(node: LockFreeLinkedListNode): Boolean
Expand Down Expand Up @@ -57,11 +57,23 @@ public expect open class AddLastDesc<T : LockFreeLinkedListNode>(
val queue: LockFreeLinkedListNode
val node: T
protected override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any?
override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode)
}

public expect open class RemoveFirstDesc<T>(queue: LockFreeLinkedListNode): AbstractAtomicDesc {
val queue: LockFreeLinkedListNode
public val result: T
protected open fun validatePrepared(node: T): Boolean
protected final override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any?
final override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode)
}

/** @suppress **This is unstable API and it is subject to change.** */
public expect abstract class AbstractAtomicDesc : AtomicDesc {
protected abstract fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any?
final override fun prepare(op: AtomicOp<*>): Any?
final override fun complete(op: AtomicOp<*>, failure: Any?)
protected open fun failure(affected: LockFreeLinkedListNode, next: Any): Any?
protected open fun retry(affected: LockFreeLinkedListNode, next: Any): Boolean
protected abstract fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? // non-null on failure
protected abstract fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode)
}
Loading

0 comments on commit 9619134

Please sign in to comment.