Skip to content

Commit

Permalink
Merge pull request #492 from p2t2/DEV3.2.1
Browse files Browse the repository at this point in the history
Dev3.2.1
  • Loading branch information
mreposa committed Jul 2, 2015
2 parents ec0638e + 85f5b80 commit c0315df
Show file tree
Hide file tree
Showing 53 changed files with 2,560 additions and 347 deletions.
2 changes: 2 additions & 0 deletions Figaro/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
/bin/
/.cache-main
/.cache-tests
2 changes: 1 addition & 1 deletion Figaro/META-INF/MANIFEST.MF
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-Name: Figaro
Bundle-SymbolicName: com.cra.figaro
Bundle-Version: 3.2.0
Bundle-Version: 3.2.1
Export-Package: com.cra.figaro.algorithm,
com.cra.figaro.algorithm.decision,
com.cra.figaro.algorithm.decision.index,
Expand Down
2 changes: 1 addition & 1 deletion Figaro/figaro_build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=3.2.0.0
version=3.2.1.0
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ class UnsupportedAlgorithmException(element: Element[_]) extends RuntimeExceptio
class AlgorithmException extends RuntimeException
class AlgorithmInactiveException extends AlgorithmException
class AlgorithmActiveException extends AlgorithmException
class NotATargetException[T](target: Element[T]) extends AlgorithmException

/**
* The general class of Figaro algorithms. The Algorithm class is defined to generalize both
Expand All @@ -47,22 +46,22 @@ trait Algorithm {
/*
* Start the algorithm. After it returns, the algorithm must be ready to provide answers.
*/
protected def doStart(): Unit
protected[algorithm] def doStart(): Unit

/*
* Stop the algorithm from computing. The algorithm is still ready to provide answers after it returns.
*/
protected def doStop(): Unit
protected[algorithm] def doStop(): Unit

/*
* Resume the computation of the algorithm, if it has been stopped.
*/
protected def doResume(): Unit
protected[algorithm] def doResume(): Unit

/*
* Kill the algorithm so that it is inactive. It will no longer be able to provide answers.
*/
protected def doKill(): Unit
protected[algorithm] def doKill(): Unit

protected var active = false

Expand Down
105 changes: 76 additions & 29 deletions Figaro/src/main/scala/com/cra/figaro/algorithm/Anytime.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ package com.cra.figaro.algorithm
import com.cra.figaro.language._
import akka.actor._
import com.typesafe.config.ConfigFactory
import akka.util.Timeout
import java.util.concurrent.TimeUnit
import akka.pattern.{ ask }
import scala.concurrent.Await
import scala.concurrent.Future
import java.util.concurrent.TimeoutException
import scala.concurrent.duration.Duration

/**
* Class of services implemented by the anytime algorithm.
Expand All @@ -26,8 +33,14 @@ abstract class Service
* Class of responses to services.
*/
abstract class Response

/**
* Ack Response (String)
*/
case object AckResponse extends Response

/**
* General Response (String)
* Exception Response (String)
*/
case class ExceptionResponse(msg: String) extends Response

Expand All @@ -40,6 +53,9 @@ sealed abstract class Message
*/
case class Handle(service: Service) extends Message


class AnytimeAlgorithmException(s: String) extends RuntimeException(s)

/**
* An anytime algorithm is able to improve its estimated answers over time. Anytime algorithms run in their
* own thread using an actor.
Expand All @@ -50,15 +66,6 @@ case class Handle(service: Service) extends Message
*/

trait Anytime extends Algorithm {
/**
* Run a single step of the algorithm. The algorithm must be able to provide answers after each step.
*/
def runStep(): Unit

/**
* Optional function to run when the algorithm is stopped (not killed). Used in samplers to update lazy values.
*/
def stopUpdate(): Unit = { }

/**
* A class representing the actor running the algorithm.
Expand All @@ -71,7 +78,8 @@ trait Anytime extends Algorithm {
sender ! handle(service)
case "stop" =>
stopUpdate()
become (inactive)
sender ! AckResponse
become(inactive)
case "next" =>
runStep()
self ! "next"
Expand All @@ -89,9 +97,11 @@ trait Anytime extends Algorithm {
case "resume" =>
resume()
become(active)
self ! "next"
self ! "next"
case "kill" =>
become(shuttingDown)
cleanUp()
sender ! AckResponse
become(shuttingDown)
case _ =>
sender ! ExceptionResponse("Algorithm is stopped")
}
Expand All @@ -102,8 +112,6 @@ trait Anytime extends Algorithm {
}

def receive = inactive


}

/**
Expand All @@ -121,41 +129,80 @@ trait Anytime extends Algorithm {
var runner: ActorRef = null
var running = false;

/**
* default message timeout. Increase if queries to the algorithm fail due to timeout
*/
implicit var messageTimeout = Timeout(5000, TimeUnit.MILLISECONDS)

/**
* Run a single step of the algorithm. The algorithm must be able to provide answers after each step.
*/
def runStep(): Unit

/**
* Optional function to run when the algorithm is stopped (not killed). Used in samplers to update lazy values.
*/
def stopUpdate(): Unit = {}

/**
* A handler of services provided by the algorithm.
*/
def handle(service: Service): Response


protected def doStart() = {
protected[algorithm] def doStart() = {
if (!running) {
system = ActorSystem("Anytime", ConfigFactory.load(customConf))
runner = system.actorOf(Props(new Runner))
initialize()
running = true
system = ActorSystem("Anytime", ConfigFactory.load(customConf))
runner = system.actorOf(Props(new Runner))
initialize()
running = true
}

runner ! "start"
}

protected def doStop() = runner ! "stop"
protected[algorithm] def doStop() = runner ! "stop"

protected def doResume() = runner ! "resume"
protected[algorithm] def doResume() = runner ! "resume"

protected def doKill() = {
protected[algorithm] def doKill() = {
shutdown
}

/**
* Release all resources from this anytime algorithm.
*/
def shutdown {
cleanUp()
if (running)
{
runner ! "kill"
system.stop(runner)
system.shutdown
if (running) {
awaitResponse(runner ? "kill", messageTimeout.duration)
system.stop(runner)
system.shutdown
}
}

/*
* A helper function to query the running thread and await a response.
* In the case that it times out, it will print a message that it timed out and return an exception response.
* Note, on a time, it does NOT throw an exception.
*/
protected def awaitResponse(response: Future[Any], duration: Duration): Response = {
try {
val result = Await.result(response, duration)
result match {
case e: ExceptionResponse => {
println(e.msg)
e
}
case r: Response => r
case _ => throw new AnytimeAlgorithmException("Unknown Response")
}
} catch {
case to: TimeoutException => {
println("Error! Did not receive a response from algorithm thread - it may be hanging or taking an exceptionally long time to respond. Try increasing messageTimeout.")
ExceptionResponse("Timeout")
}
case e: Exception => throw e
}
}

}
13 changes: 12 additions & 1 deletion Figaro/src/main/scala/com/cra/figaro/algorithm/AnytimeMPE.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.cra.figaro.algorithm

import com.cra.figaro.language._
import akka.pattern.{ask}

/**
* Anytime algorithms that compute most likely values of elements.
Expand All @@ -25,7 +26,7 @@ trait AnytimeMPE extends MPEAlgorithm with Anytime {
* A message instructing the handler to compute the most likely value of the target element.
*/
case class ComputeMostLikelyValue[T](target: Element[T]) extends Service

/**
* A message from the handler containing the most likely value of the previously requested element.
*/
Expand All @@ -36,4 +37,14 @@ trait AnytimeMPE extends MPEAlgorithm with Anytime {
case ComputeMostLikelyValue(target) =>
MostLikelyValue(mostLikelyValue(target))
}

protected def doMostLikelyValue[T](target: Element[T]): T = {
awaitResponse(runner ? Handle(ComputeMostLikelyValue(target)), messageTimeout.duration) match {
case MostLikelyValue(result) => result.asInstanceOf[T]
case _ => {
println("Error: Response not recognized from algorithm")
target.value
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,12 @@ trait AnytimeProbEvidence extends ProbEvidenceAlgorithm with Anytime {
/**
* Returns the probability of evidence of the universe on which the algorithm operates.
* Throws AlgorithmInactiveException if the algorithm is not active.
*/
implicit val timeout = Timeout(5000, TimeUnit.MILLISECONDS)
*/
def probabilityOfEvidence(): Double = {
if (!active) throw new AlgorithmInactiveException
val response = runner ? Handle(ComputeProbEvidence)
Await.result(response, timeout.duration ).asInstanceOf[Response] match {
awaitResponse(runner ? Handle(ComputeProbEvidence), messageTimeout.duration) match {
case ProbEvidence(result) => result
case ExceptionResponse(msg) =>
println(msg)
0.0
case _ => 0.0
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ trait AnytimeProbQuery extends ProbQueryAlgorithm with Anytime {
* A message from the handler containing the probability of the previously requested predicate and element.
*/
case class Probability(probability: Double) extends Response
/**
* A message instructing the handler to compute the projection of the target element.
*/
case class ComputeProjection[T](target: Element[T]) extends Service
/**
* A message from the handler containing the projection of the previously requested element.
*/
case class Projection[T](projection: List[(T, Double)]) extends Response

def handle(service: Service): Response =
service match {
Expand All @@ -60,38 +68,33 @@ trait AnytimeProbQuery extends ProbQueryAlgorithm with Anytime {
case ComputeProbability(target, predicate) =>
Probability(computeProbability(target, predicate))
}

implicit val timeout = Timeout(5000, TimeUnit.MILLISECONDS)

protected def doDistribution[T](target: Element[T]): Stream[(Double, T)] = {
val response = runner ? Handle(ComputeDistribution(target))
Await.result(response, timeout.duration ).asInstanceOf[Response] match {
awaitResponse(runner ? Handle(ComputeDistribution(target)), messageTimeout.duration) match {
case Distribution(result) => result.asInstanceOf[Stream[(Double, T)]]
case ExceptionResponse(msg) =>
println(msg)
Stream()
case _ => Stream()
}
}
}

protected def doExpectation[T](target: Element[T], function: T => Double): Double = {
val response = runner ? Handle(ComputeExpectation(target, function))
Await.result(response, timeout.duration ).asInstanceOf[Response] match {
awaitResponse(runner ? Handle(ComputeExpectation(target, function)), messageTimeout.duration) match {
case Expectation(result) => result
case ExceptionResponse(msg) =>
println(msg)
0.0
case _ => 0.0
}
}
}

protected override def doProbability[T](target: Element[T], predicate: T => Boolean): Double = {
val response = runner ? Handle(ComputeProbability(target, predicate))
Await.result(response, timeout.duration ).asInstanceOf[Response] match {
awaitResponse(runner ? Handle(ComputeProbability(target, predicate)), messageTimeout.duration) match {
case Probability(result) => result
case ExceptionResponse(msg) =>
println(msg)
0.0
case _ => 0.0
}
}
}

protected override def doProjection[T](target: Element[T]): List[(T, Double)] = {
awaitResponse(runner ? Handle(ComputeProjection(target)), messageTimeout.duration) match {
case Projection(result) => result.asInstanceOf[List[(T, Double)]]
case _ => List()
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,7 @@ trait MPEAlgorithm extends Algorithm {
* Returns the most likely value for the target element.
*/
def mostLikelyValue[T](target: Element[T]): T

// Defined in one time or anytime versions
protected def doMostLikelyValue[T](target: Element[T]): T
}
8 changes: 4 additions & 4 deletions Figaro/src/main/scala/com/cra/figaro/algorithm/OneTime.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@ trait OneTime extends Algorithm {
*/
def run(): Unit

protected def doStart() = {
protected[algorithm] def doStart() = {
initialize()
run()
}

protected def doStop() = ()
protected[algorithm] def doStop() = ()

protected def doResume() = ()
protected[algorithm] def doResume() = ()

protected def doKill() = {
protected[algorithm] def doKill() = {
cleanUp()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@ import com.cra.figaro.language._
* One-time algorithms that compute the most likely values of elements.
* A class that implements this trait must implement run and mostLikelyValue methods.
*/
trait OneTimeMPE extends MPEAlgorithm with OneTime
trait OneTimeMPE extends MPEAlgorithm with OneTime {
protected def doMostLikelyValue[T](target: Element[T]): T = mostLikelyValue(target)
}
Loading

0 comments on commit c0315df

Please sign in to comment.