Skip to content

Commit

Permalink
Make some async api work on Scala.js and Native (#12)
Browse files Browse the repository at this point in the history
* Move API only working on JVM to JVM specific code

* Move shared JS/Native code to src-js-native

* Use package private traits to add functionality

This implements a different approach which is NOT binary
compatible, but is source compatible and avoid duplicating code.
The idea is to have mixins with different implementations
for every object or trait that has functions existing on JVM
only

* Fix Js/Native and make ContextSimpleCompanionObject private
  • Loading branch information
lolgab authored Sep 15, 2021
1 parent 675137f commit baf69d1
Show file tree
Hide file tree
Showing 12 changed files with 151 additions and 82 deletions.
6 changes: 6 additions & 0 deletions build.sc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ object castor extends Module {
def platformSegment = "js"
def scalaJSVersion = crossScalaJsVersion
def millSourcePath = super.millSourcePath / os.up
override def sources = T.sources {
super.sources() ++ Seq(PathRef(millSourcePath / "src-js-native"))
}
object test extends Tests with ActorTestModule {
def platformSegment = "js"
def scalaVersion = crossScalaVersion
Expand All @@ -86,6 +89,9 @@ object castor extends Module {
def platformSegment = "native"
def scalaNativeVersion = crossScalaNativeVersion
def millSourcePath = super.millSourcePath / os.up
override def sources = T.sources {
super.sources() ++ Seq(PathRef(millSourcePath / "src-js-native"))
}
object test extends Tests with ActorTestModule {
def platformSegment = "native"
}
Expand Down
3 changes: 3 additions & 0 deletions castor/src-js-native/Context.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package castor.platform

private [castor] trait Context
5 changes: 5 additions & 0 deletions castor/src-js-native/ContextCompanionObject.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package castor.platform

private [castor] trait ContextCompanionObject {
private [castor] trait ContextSimpleCompanionObject
}
3 changes: 3 additions & 0 deletions castor/src-js-native/ContextImpl.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package castor.platform

private [castor] trait ContextImpl
7 changes: 7 additions & 0 deletions castor/src-js-native/Platform.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package castor.platform

import scala.concurrent.ExecutionContext

private [castor] object Platform {
def executionContext = ExecutionContext.global
}
9 changes: 9 additions & 0 deletions castor/src-jvm/castor/platform/Context.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package castor.platform

import castor._

private [castor] trait Context {
def scheduleMsg[T](a: Actor[T], msg: T, time: java.time.Duration)
(implicit fileName: sourcecode.FileName,
line: sourcecode.Line): Unit
}
47 changes: 47 additions & 0 deletions castor/src-jvm/castor/platform/ContextCompanionObject.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package castor.platform

import castor._

import scala.concurrent.ExecutionContext

import java.util.concurrent.{Executors, ThreadFactory}

private [castor] trait ContextCompanionObject {
/**
* Castor actor context based on a fixed thread pool
*/
def makeThreadPool(numThreads: Int, daemon: Boolean) = Executors.newFixedThreadPool(
numThreads,
new ThreadFactory {
override def newThread(r: Runnable) = {
val t = new Thread(r)
t.setDaemon(daemon)
t
}
}
)
class ThreadPool(numThreads: Int = Runtime.getRuntime().availableProcessors(),
daemon: Boolean = true,
logEx: Throwable => Unit = _.printStackTrace()) extends Context.Impl{
val threadPool = makeThreadPool(numThreads, daemon)

val executionContext = ExecutionContext.fromExecutorService(threadPool)

def reportFailure(cause: Throwable): Unit = logEx(cause)
def shutdown() = threadPool.shutdownNow()
}

/**
* [[castor.Context.ThreadPool]] used for testing; tracks scheduling and completion of
* tasks and futures, so we can support a `.waitForInactivity()` method to wait
* until the system is quiescient
*/
class TestThreadPool(numThreads: Int = Runtime.getRuntime().availableProcessors(),
daemon: Boolean = true,
logEx: Throwable => Unit = _.printStackTrace())
extends ThreadPool(numThreads, daemon, logEx) with Context.TestBase

private [castor] trait ContextSimpleCompanionObject {
lazy val threadPool = makeThreadPool(Runtime.getRuntime().availableProcessors(), true)
}
}
34 changes: 34 additions & 0 deletions castor/src-jvm/castor/platform/ContextImpl.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package castor.platform

import java.util.concurrent.{Executors, ThreadFactory, TimeUnit}

import castor._

private [castor] trait ContextImpl extends Context { this: Context.Impl =>
lazy val scheduler = Executors.newSingleThreadScheduledExecutor(
new ThreadFactory {
def newThread(r: Runnable): Thread = {
val t = new Thread(r, "ActorContext-Scheduler-Thread")
t.setDaemon(true)
t
}
}
)

def scheduleMsg[T](a: Actor[T],
msg: T, delay: java.time.Duration)
(implicit fileName: sourcecode.FileName,
line: sourcecode.Line) = {
val token = reportSchedule(a, msg, fileName, line)
scheduler.schedule[Unit](
new java.util.concurrent.Callable[Unit] {
def call(): Unit = {
a.send(msg)(fileName, line)
reportComplete(token)
}
},
delay.toMillis,
TimeUnit.MILLISECONDS
)
}
}
11 changes: 11 additions & 0 deletions castor/src-jvm/castor/platform/Platform.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package castor.platform

import castor._

import scala.concurrent.ExecutionContext

import java.util.concurrent.{Executors, ThreadFactory, TimeUnit}

private [castor] object Platform {
def executionContext = ExecutionContext.fromExecutorService(Context.Simple.threadPool)
}
11 changes: 11 additions & 0 deletions castor/src/Actor.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package castor

trait Actor[T]{
def send(t: T)
(implicit fileName: sourcecode.FileName,
line: sourcecode.Line): Unit

def sendAsync(f: scala.concurrent.Future[T])
(implicit fileName: sourcecode.FileName,
line: sourcecode.Line): Unit
}
87 changes: 6 additions & 81 deletions castor/src/Context.scala
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
package castor

import java.util.concurrent.{Executors, ThreadFactory, TimeUnit}

import scala.concurrent.{Await, ExecutionContext, Future, Promise}
Expand All @@ -8,7 +9,7 @@ import scala.concurrent.{Await, ExecutionContext, Future, Promise}
* schedule messages to be sent later, and hooks to track the current number of
* outstanding tasks or log the actor message sends for debugging purposes
*/
trait Context extends ExecutionContext {
trait Context extends ExecutionContext with platform.Context {
def reportSchedule(): Context.Token = new Context.Token.Simple()

def reportSchedule(fileName: sourcecode.FileName,
Expand All @@ -29,18 +30,14 @@ trait Context extends ExecutionContext {

def reportComplete(token: Context.Token): Unit = ()

def scheduleMsg[T](a: Actor[T], msg: T, time: java.time.Duration)
(implicit fileName: sourcecode.FileName,
line: sourcecode.Line): Unit

def future[T](t: => T)
(implicit fileName: sourcecode.FileName,
line: sourcecode.Line): Future[T]

def execute(runnable: Runnable): Unit
}

object Context{
object Context extends platform.ContextCompanionObject {
trait Token
object Token{
class Simple extends Token(){
Expand Down Expand Up @@ -72,35 +69,10 @@ object Context{
def reportFailure(t: Throwable) = logEx(t)
}

object Simple{
lazy val threadPool = makeThreadPool(Runtime.getRuntime().availableProcessors(), true)
lazy val executionContext = ExecutionContext.fromExecutorService(threadPool)
object Simple extends ContextSimpleCompanionObject {
lazy val executionContext = platform.Platform.executionContext
implicit lazy val global: Simple = new Simple(executionContext, _.printStackTrace())
}
def makeThreadPool(numThreads: Int, daemon: Boolean) = Executors.newFixedThreadPool(
numThreads,
new ThreadFactory {
override def newThread(r: Runnable) = {
val t = new Thread(r)
t.setDaemon(daemon)
t
}
}
)

/**
* Castor actor context based on a fixed thread pool
*/
class ThreadPool(numThreads: Int = Runtime.getRuntime().availableProcessors(),
daemon: Boolean = true,
logEx: Throwable => Unit = _.printStackTrace()) extends Context.Impl{
val threadPool = makeThreadPool(numThreads, daemon)

val executionContext = ExecutionContext.fromExecutorService(threadPool)

def reportFailure(cause: Throwable): Unit = logEx(cause)
def shutdown() = threadPool.shutdownNow()
}

/**
* [[castor.Context.Simple]] used for testing; tracks scheduling and completion of
Expand All @@ -111,16 +83,6 @@ object Context{
logEx: Throwable => Unit = _.printStackTrace())
extends Simple(executionContext, logEx) with TestBase

/**
* [[castor.Context.ThreadPool]] used for testing; tracks scheduling and completion of
* tasks and futures, so we can support a `.waitForInactivity()` method to wait
* until the system is quiescient
*/
class TestThreadPool(numThreads: Int = Runtime.getRuntime().availableProcessors(),
daemon: Boolean = true,
logEx: Throwable => Unit = _.printStackTrace())
extends ThreadPool(numThreads, daemon, logEx) with TestBase

trait TestBase extends Context.Impl {
def executionContext: ExecutionContext
private[this] val active = collection.mutable.Set.empty[Context.Token]
Expand Down Expand Up @@ -167,7 +129,7 @@ object Context{
}
}

trait Impl extends Context {
trait Impl extends Context with platform.ContextImpl {
def executionContext: ExecutionContext

def execute(runnable: Runnable): Unit = {
Expand All @@ -193,43 +155,6 @@ object Context{
})
p.future
}

lazy val scheduler = Executors.newSingleThreadScheduledExecutor(
new ThreadFactory {
def newThread(r: Runnable): Thread = {
val t = new Thread(r, "ActorContext-Scheduler-Thread")
t.setDaemon(true)
t
}
}
)

def scheduleMsg[T](a: Actor[T],
msg: T, delay: java.time.Duration)
(implicit fileName: sourcecode.FileName,
line: sourcecode.Line) = {
val token = reportSchedule(a, msg, fileName, line)
scheduler.schedule[Unit](
new java.util.concurrent.Callable[Unit] {
def call(): Unit = {
a.send(msg)(fileName, line)
reportComplete(token)
}
},
delay.toMillis,
TimeUnit.MILLISECONDS
)
}
}

}

trait Actor[T]{
def send(t: T)
(implicit fileName: sourcecode.FileName,
line: sourcecode.Line): Unit

def sendAsync(f: scala.concurrent.Future[T])
(implicit fileName: sourcecode.FileName,
line: sourcecode.Line): Unit
}
10 changes: 9 additions & 1 deletion castor/test/src/ActorsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,13 @@ object ActorsTest extends TestSuite{
}
}
}
test("async"){
import Context.Simple.global
import scala.concurrent.Future
object foo extends SimpleActor[Unit]{
def run(msg: Unit) = ()
}
foo.sendAsync(Future(()))
}
}
}
}

0 comments on commit baf69d1

Please sign in to comment.