Skip to content

Commit

Permalink
Merge pull request #2249 from djspiewak/feature/bytestack-density
Browse files Browse the repository at this point in the history
  • Loading branch information
djspiewak authored Aug 26, 2021
2 parents 483ede7 + 7e31a48 commit 30168fc
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ import java.util.concurrent.TimeUnit
@OutputTimeUnit(TimeUnit.SECONDS)
class ParallelBenchmark {

@Param(Array("100", "1000", "10000"))
@Param(Array( /*"100", */ "1000" /*, "10000"*/ ))
var size: Int = _

@Param(Array("100", "1000", "10000", "100000", "1000000"))
@Param(Array( /*"100", "1000", */ "10000" /*, "100000", "1000000"*/ ))
var cpuTokens: Long = _

@Benchmark
Expand Down
5 changes: 4 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ lazy val core = crossProject(JSPlatform, JVMPlatform)
ProblemFilters.exclude[DirectMissingMethodProblem]("cats.effect.IO#IOCont.this"),
ProblemFilters.exclude[IncompatibleMethTypeProblem](
"cats.effect.unsafe.IORuntimeCompanionPlatform.installGlobal"),
ProblemFilters.exclude[Problem]("cats.effect.ByteStack.*"),
// introduced by #2254, Check `WorkerThread` ownership before scheduling
// changes to `cats.effect.unsafe` package private code
ProblemFilters.exclude[DirectMissingMethodProblem](
Expand Down Expand Up @@ -450,7 +451,9 @@ lazy val example = crossProject(JSPlatform, JVMPlatform)
lazy val benchmarks = project
.in(file("benchmarks"))
.dependsOn(core.jvm)
.settings(name := "cats-effect-benchmarks")
.settings(
name := "cats-effect-benchmarks",
javaOptions ++= Seq("-Dcats.effect.tracing.mode=none", "-Dcats.effect.tracing.exceptions.enhanced=false"))
.enablePlugins(NoPublishPlugin, JmhPlugin)

lazy val docs = project.in(file("site-docs")).dependsOn(core.jvm).enablePlugins(MdocPlugin)
100 changes: 52 additions & 48 deletions core/shared/src/main/scala/cats/effect/ByteStack.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,65 +16,69 @@

package cats.effect

private[effect] final class ByteStack(
private[this] var buffer: Array[Byte],
private[this] var index: Int) {
private[effect] object ByteStack {

def this(initBound: Int) =
this(new Array[Byte](initBound), 0)

def this() = this(null, 0)

def init(bound: Int): Unit = {
buffer = new Array(bound)
index = 0
final def toDebugString(stack: Array[Int], translate: Byte => String = _.toString): String = {
val count = size(stack)
((count - 1) to 0 by -1)
.foldLeft(
new StringBuilder()
.append("Stack:")
.append(" capacity = ")
.append((stack.length - 1) * 8)
.append(',')
.append(" count = ")
.append(count)
.append(',')
.append(" content (top-first) = [ ")
) { (b, i) => b.append(translate(ByteStack.read(stack, i))).append(' ') }
.append(']')
.toString
}

def push(b: Byte): Unit = {
checkAndGrow()
buffer(index) = b
index += 1
}
final def create(initialMaxOps: Int): Array[Int] =
new Array[Int](1 + 1 + ((initialMaxOps - 1) >> 3)) // count-slot + 1 for each set of 8 ops

// TODO remove bounds check
def pop(): Byte = {
index -= 1
buffer(index)
final def growIfNeeded(stack: Array[Int], count: Int): Array[Int] = {
if ((1 + ((count + 1) >> 3)) < stack.length) {
stack
} else {
val bigger = new Array[Int](stack.length << 1)
System.arraycopy(stack, 0, bigger, 0, stack.length) // Count in stack(0) copied "for free"
bigger
}
}

def peek(): Byte = buffer(index - 1)

def isEmpty(): Boolean = index <= 0
final def push(stack: Array[Int], op: Byte): Array[Int] = {
val c = stack(0) // current count of elements
val use = growIfNeeded(stack, c) // alias so we add to the right place
val s = (c >> 3) + 1 // current slot in `use`
val shift = (c & 7) << 2 // BEGIN MAGIC
use(s) = (use(s) & ~(0xffffffff << shift)) | (op << shift) // END MAGIC
use(0) += 1 // write the new count
use
}

// to allow for external iteration
def unsafeBuffer(): Array[Byte] = buffer
def unsafeIndex(): Int = index
final def size(stack: Array[Int]): Int =
stack(0)

def unsafeSet(newI: Int): Unit =
index = newI
final def isEmpty(stack: Array[Int]): Boolean =
stack(0) < 1

def invalidate(): Unit = {
index = 0
buffer = null
final def read(stack: Array[Int], pos: Int): Byte = {
if (pos < 0 || pos >= stack(0)) throw new ArrayIndexOutOfBoundsException()
((stack((pos >> 3) + 1) >>> ((pos & 7) << 2)) & 0x0000000f).toByte
}

def copy(): ByteStack = {
val buffer2 = if (index == 0) {
new Array[Byte](buffer.length)
} else {
val buffer2 = new Array[Byte](buffer.length)
System.arraycopy(buffer, 0, buffer2, 0, buffer.length)
buffer2
}

new ByteStack(buffer2, index)
final def peek(stack: Array[Int]): Byte = {
val c = stack(0) - 1
if (c < 0) throw new ArrayIndexOutOfBoundsException()
((stack((c >> 3) + 1) >>> ((c & 7) << 2)) & 0x0000000f).toByte
}

private[this] def checkAndGrow(): Unit =
if (index >= buffer.length) {
val len = buffer.length
val buffer2 = new Array[Byte](len * 2)
System.arraycopy(buffer, 0, buffer2, 0, len)
buffer = buffer2
}
final def pop(stack: Array[Int]): Byte = {
val op = peek(stack)
stack(0) -= 1
op
}
}
47 changes: 25 additions & 22 deletions core/shared/src/main/scala/cats/effect/IOFiber.scala
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private final class IOFiber[A](
* Ideally these would be on the stack, but they can't because we sometimes need to
* relocate our runloop to another fiber.
*/
private[this] val conts: ByteStack = new ByteStack()
private[this] var conts = ByteStack.create(16) // use ByteStack to interact
private[this] val objectState: ArrayStack[AnyRef] = new ArrayStack()

/* fast-path to head */
Expand Down Expand Up @@ -345,7 +345,7 @@ private final class IOFiber[A](

case _ =>
objectState.push(f)
conts.push(MapK)
conts = ByteStack.push(conts, MapK)
runLoop(ioe, nextCancelation, nextAutoCede)
}

Expand Down Expand Up @@ -410,7 +410,7 @@ private final class IOFiber[A](

case _ =>
objectState.push(f)
conts.push(FlatMapK)
conts = ByteStack.push(conts, FlatMapK)
runLoop(ioe, nextCancelation, nextAutoCede)
}

Expand Down Expand Up @@ -475,7 +475,7 @@ private final class IOFiber[A](
runLoop(succeeded(Right(trace), 0), nextCancelation - 1, nextAutoCede)

case _ =>
conts.push(AttemptK)
conts = ByteStack.push(conts, AttemptK)
runLoop(ioa, nextCancelation, nextAutoCede)
}

Expand All @@ -485,7 +485,7 @@ private final class IOFiber[A](
pushTracingEvent(cur.event)

objectState.push(cur.f)
conts.push(HandleErrorWithK)
conts = ByteStack.push(conts, HandleErrorWithK)

runLoop(cur.ioa, nextCancelation, nextAutoCede)

Expand All @@ -509,7 +509,7 @@ private final class IOFiber[A](
* the OnCancelK marker is used by `succeeded` to remove the
* finalizer when `ioa` completes uninterrupted.
*/
conts.push(OnCancelK)
conts = ByteStack.push(conts, OnCancelK)
runLoop(cur.ioa, nextCancelation, nextAutoCede)

case 12 =>
Expand All @@ -527,7 +527,7 @@ private final class IOFiber[A](
* The uncancelableK marker is used by `succeeded` and `failed`
* to unmask once body completes.
*/
conts.push(UncancelableK)
conts = ByteStack.push(conts, UncancelableK)
runLoop(cur.body(poll), nextCancelation, nextAutoCede)

case 13 =>
Expand All @@ -543,7 +543,7 @@ private final class IOFiber[A](
* The UnmaskK marker gets used by `succeeded` and `failed`
* to restore masking state after `cur.ioa` has finished
*/
conts.push(UnmaskK)
conts = ByteStack.push(conts, UnmaskK)
}

runLoop(cur.ioa, nextCancelation, nextAutoCede)
Expand Down Expand Up @@ -708,7 +708,7 @@ private final class IOFiber[A](
()
}
finalizers.push(fin)
conts.push(OnCancelK)
conts = ByteStack.push(conts, OnCancelK)

if (state.compareAndSet(ContStateInitial, ContStateWaiting)) {
/*
Expand Down Expand Up @@ -893,7 +893,7 @@ private final class IOFiber[A](
val ec = cur.ec
currentCtx = ec
ctxs.push(ec)
conts.push(EvalOnK)
conts = ByteStack.push(conts, EvalOnK)

resumeTag = EvalOnR
resumeIO = cur.ioa
Expand Down Expand Up @@ -956,7 +956,7 @@ private final class IOFiber[A](

/* clear out literally everything to avoid any possible memory leaks */

conts.invalidate()
conts = null
objectState.invalidate()
finalizers.invalidate()
ctxs.invalidate()
Expand All @@ -968,8 +968,8 @@ private final class IOFiber[A](
finalizing = true

if (!finalizers.isEmpty()) {
conts.init(16)
conts.push(CancelationLoopK)
conts = ByteStack.create(8)
conts = ByteStack.push(conts, CancelationLoopK)

objectState.init(16)
objectState.push(cb)
Expand Down Expand Up @@ -1043,7 +1043,7 @@ private final class IOFiber[A](

@tailrec
private[this] def succeeded(result: Any, depth: Int): IO[Any] =
(conts.pop(): @switch) match {
(ByteStack.pop(conts): @switch) match {
case 0 => mapK(result, depth)
case 1 => flatMapK(result, depth)
case 2 => cancelationLoopSuccessK()
Expand All @@ -1064,16 +1064,16 @@ private final class IOFiber[A](
Tracing.augmentThrowable(runtime.config.enhancedExceptions, error, tracingEvents)

// println(s"<$name> failed() with $error")
val buffer = conts.unsafeBuffer()
/*val buffer = conts.unsafeBuffer()
var i = conts.unsafeIndex() - 1
val orig = i
var k: Byte = -1
/*
* short circuit on error by dropping map and flatMap continuations
* until we hit a continuation that needs to deal with errors.
*/
while (i >= 0 && k < 0) {
if (buffer(i) <= FlatMapK)
i -= 1
Expand All @@ -1082,12 +1082,15 @@ private final class IOFiber[A](
}
conts.unsafeSet(i)
objectState.unsafeSet(objectState.unsafeIndex() - (orig - i))
objectState.unsafeSet(objectState.unsafeIndex() - (orig - i))*/

/* has to be duplicated from succeeded to ensure call-site monomorphism */
(k: @switch) match {
(ByteStack.pop(conts): @switch) match {
/* (case 0) will never continue to mapK */
/* (case 1) will never continue to flatMapK */
case 0 | 1 =>
objectState.pop()
failed(error, depth)
case 2 => cancelationLoopFailureK(error)
case 3 => runTerminusFailureK(error)
case 4 => evalOnFailureK(error)
Expand Down Expand Up @@ -1152,8 +1155,8 @@ private final class IOFiber[A](
if (canceled) {
done(IOFiber.OutcomeCanceled.asInstanceOf[OutcomeIO[A]])
} else {
conts.init(16)
conts.push(RunTerminusK)
conts = ByteStack.create(8)
conts = ByteStack.push(conts, RunTerminusK)

objectState.init(16)
finalizers.init(16)
Expand Down Expand Up @@ -1267,7 +1270,7 @@ private final class IOFiber[A](

private[this] def cancelationLoopSuccessK(): IO[Any] = {
if (!finalizers.isEmpty()) {
conts.push(CancelationLoopK)
conts = ByteStack.push(conts, CancelationLoopK)
runLoop(finalizers.pop(), cancelationCheckThreshold, autoYieldThreshold)
} else {
/* resume external canceller */
Expand Down
Loading

0 comments on commit 30168fc

Please sign in to comment.