Skip to content

Commit

Permalink
replaced DStreamProp.forAll by trait DStreamTLProperty
Browse files Browse the repository at this point in the history
  • Loading branch information
juanrh committed Jan 11, 2016
1 parent ba186cc commit 25433de
Show file tree
Hide file tree
Showing 5 changed files with 266 additions and 16 deletions.
13 changes: 10 additions & 3 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
# sscheck 0.0.1 - RDD generators
# sscheck 0.2.1-SNAPSHOT
Bug fix implementation of temporal properties. Uses of `DStreamProp.forAll` combining with extending the trait `SharedStreamingContextBeforeAfterEach` should be replaced by extending the trait `DStreamTLProperty` and calling `forAllDStream`. This solves:

Shared Spark context for ScalaCheck generators based on parallelization of lists, through the integration of ScalaCheck and specs2
* Execution of a test case is now independent from others, as a new streaming context is created for each test case. This is particularly important for stateful DStream transformations
* Replaced uses `DynSingleSeqQueueInputDStream` by `TestInputStream` from [spark-testing-base](https://github.com/holdenk/spark-testing-base), which implements checkpointing correctly
* fixed [#32](https://github.com/juanrh/sscheck/issues/32) and [#31](https://github.com/juanrh/sscheck/issues/31)

# sscheck 0.0.2 - Temporal logic generators and properties
# sscheck 0.2.0 - Temporal logic generators and properties
First implementation of a temporal logic for testing Spark Streaming with ScalaCheck. This allows to define:

* Generators for DStream defined by temporal logic formulas.
* Properties for testing functions over DStream, using a ScalaCheck generator, and a propositional temporal logic formula as the assertion. DStreamProp.forAll defines a property that is universally quantified over the generated test cases.

# sscheck 0.1.0 - RDD generators

Shared Spark context for ScalaCheck generators based on parallelization of lists, through the integration of ScalaCheck and specs2
249 changes: 246 additions & 3 deletions src/main/scala/es/ucm/fdi/sscheck/prop/tl/DStreamProp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import org.specs2.scalacheck.AsResultProp
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.{StreamingContext, Duration}
import org.apache.spark.streaming.scheduler.{StreamingListener, StreamingListenerBatchCompleted}
import scala.reflect.ClassTag
import scala.concurrent.SyncVar
Expand All @@ -26,6 +26,7 @@ import scala.util.Properties.lineSeparator

import org.apache.spark.streaming.dstream.{DynSingleSeqQueueInputDStream}
import es.ucm.fdi.sscheck.spark.Parallelism
import es.ucm.fdi.sscheck.spark.SharedSparkContextBeforeAfterAll
import es.ucm.fdi.sscheck.{TestCaseIdCounter,PropResult,TestCaseId,TestCaseRecord}

class PropExecutionException(msg : String)
Expand All @@ -39,6 +40,7 @@ object TestCaseTimeoutException {
case class TestCaseTimeoutException(msg : String)
extends PropExecutionException(msg)

@deprecated(message="Instead extend DStreamTLProperty and call forAllDStream", since = "0.2.1-SNAPSHOT")
object DStreamProp {
@transient private val logger = Logger(LoggerFactory.getLogger("DStreamProp"))

Expand Down Expand Up @@ -82,7 +84,7 @@ ${rdd.take(numSampleRecords).mkString(lineSeparator)}
// - only Prop.forall writes to resetFormula, to signal foreachRDD to reset the formula
// - only foreachRDD writes to currFormula
// - Prop.forall reads currFormula, and foreachRDD reads resetFormula
@volatile var currFormula : NextFormula[U] = null // effectively initialized to formulaNext, if the code is ok
@volatile var currFormula : NextFormula[U] = formulaNext // this need to be explicitly initialized, see #32
// for "the cheap read-write lock trick" https://www.ibm.com/developerworks/java/library/j-jtp06197/
val currFormulaLock = new Serializable{}
@volatile var resetFormula = true
Expand Down Expand Up @@ -191,7 +193,248 @@ ${rdd.take(numSampleRecords).mkString(lineSeparator)}
}
}
}
}

object DStreamTLProperty {
@transient private val logger = Logger(LoggerFactory.getLogger("DStreamTLProperty"))
val defaultTimeout = Timeout(10)
}
trait DStreamTLProperty
extends SharedSparkContextBeforeAfterAll {

import DStreamTLProperty.{logger, defaultTimeout}

/** Override for custom configuration
* */
def batchDuration : Duration

/** Override for custom configuration
* Disabled by default because it is quite costly
* */
def enableCheckpointing : Boolean = false


/** @return a newly created streaming context, for which no DStream or action has
* been defined, and that it's not started
*
* Precondition: no streaming context is currently started
* in this JVM
* */
def buildFreshStreamingContext() : StreamingContext = {
val __sc = sc()
logger.warn(s"creating test Spark Streaming context")
val newSsc = new StreamingContext(__sc, batchDuration)
if (enableCheckpointing) {
val checkpointDir = com.holdenkarau.spark.testing.Utils.createTempDir().toString
logger.warn(s"configuring Spark Streaming checkpoint directory ${checkpointDir}")
newSsc.checkpoint(checkpointDir)
}
newSsc
}

/** @return a ScalaCheck property that is executed by:
* - generating a prefix of a DStream with g1
* - generating a derived DStream with gt1
* - checking formula on those DStreams
*
* The property is satisfied iff all the test cases satisfy the formula.
* A new streaming context is created for each test case to isolate its
* execution, which is particularly relevant if gt1 is stateful
*
* WARNING: the resulting Prop cannot be configured for parallel execution of
* test cases, in order to avoid having more than a single StreamingContext
* started in the same JVM
* */
def forAllDStream[E1:ClassTag,E2:ClassTag,T <% Timeout]
(g1: Gen[Seq[Seq[E1]]])(gt1 : (DStream[E1]) => DStream[E2])
(formula : Formula[(RDD[E1], RDD[E2])], on : T = defaultTimeout)
(implicit pp1: Seq[Seq[E1]] => Pretty) : Prop = {
val formulaNext = formula.nextFormula
// test case id counter / generator
val testCaseIdCounter = new TestCaseIdCounter

// Create a new streaming context per test case, and use it to create a new TestCaseContext
// that will use TestInputStream from spark-testing-base to create new input and output
// dstreams, and register a foreachRDD action to evaluate the formula
Prop.forAllNoShrink (g1) { (testCaseDstream : Seq[Seq[E1]]) =>
// Setup new test case
val testCaseId : TestCaseId = testCaseIdCounter.nextId()
// create, start and stop context for each test case
// create a fresh streaming context for this test case, and pass it unstarted to
// a new TestCaseContext, which will setup the streams and actions, and start the streaming context
val freshSsc = buildFreshStreamingContext() // super[SharedStreamingContext].ssc()
val testCaseContext = new TestCaseContext[E1, E2](testCaseDstream, gt1, formulaNext)(freshSsc, parallelism)
// we use propFailed to stop in the middle of the test case as soon as a counterexample is found
// Note: propFailed is not equivalent to currFormula.result.isDefined, because propFailed is
// only defined after a wait for onBatchCompleted
var propFailed = false
logger.warn(s"starting test case $testCaseId")
for (i <- 1 to testCaseDstream.length if (! propFailed)) {
// wait for batch completion
logger.debug(s"waiting end of batch ${i} of test case ${testCaseId} at ${Thread.currentThread()}")
testCaseContext.waitForBatch()
logger.debug(s"awake after end of batch ${i} of test case ${testCaseId} at ${Thread.currentThread()}")
if (testCaseContext.currFormula.result.isDefined && {
val currFormulaResult = testCaseContext.currFormula.result.get
currFormulaResult == Prop.False || currFormulaResult.isInstanceOf[Prop.Exception]
}) {
// some batch generated a counterexample
propFailed = true
}
// else do nothing, as the data is already sent
}
// the execution of this test case is completed
testCaseContext.stop() // note this does nothing if it was already stopped

// using Prop.Undecided allows us to return undecided if the test case is not
// long enough (i.e. it is a word with not enough letters) to get a conclusive
// formula evaluation
val testCaseResult = testCaseContext.currFormula.result.getOrElse(Prop.Undecided)

// Note: ScalaCheck will show the correct test case that caused the counterexample
// because only the test case that generated that counterexample will fail. Anyway we could
// use testCaseResult.mapMessage here to add testCaseDstream to the message of
// testCaseResult if we needed it
logger.warn(s"finished test case $testCaseId with result $testCaseResult")
testCaseResult match {
case Prop.True => Prop.passed
case Prop.Proof => Prop.proved
case Prop.False => Prop.falsified
case Prop.Undecided => Prop.passed //Prop.undecided FIXME make configurable
case Prop.Exception(e) => Prop.exception(e)
}
}
}
}

object TestCaseContext {
@transient private val logger = Logger(LoggerFactory.getLogger("DStreamTLProperty"))
}
/**
* Objects of this class define the DStreams involved in the test case execution
* from the test case and the provided DStream, maintain a formula object and
* register an action to evaluate that formula.
*
* ssc should be fresh and not started yet
* */
class TestCaseContext[E1:ClassTag,E2:ClassTag]
(@transient private val testCaseDstream : Seq[Seq[E1]],
@transient private val gt1 : (DStream[E1]) => DStream[E2],
@transient private val formulaNext: NextFormula[(RDD[E1], RDD[E2])])
(@transient private val ssc : StreamingContext, @transient private val parallelism : Parallelism)
extends Serializable {

import TestCaseContext.logger
type U = (RDD[E1], RDD[E2])

/* Whether the streaming context has started or not
* */
private var started = false

// -----------------------------------
// create input and output DStreams
@transient val inputDStream1 = new com.holdenkarau.spark.testing.TestInputStream[E1](
ssc.sparkContext, ssc, testCaseDstream, parallelism.numSlices)
inputDStream1.foreachRDD { (rdd, time) =>
println(s"""${DStreamProp.msgHeader}
Time: ${time} - InputDStream1 (${rdd.count} records)
${DStreamProp.msgHeader}
${rdd.take(DStreamProp.numSampleRecords).mkString(lineSeparator)}
...""")
}
@transient val transformedStream1 = gt1(inputDStream1)
// note we access transformedStream1 but only in slice(), so we need some trivial action
// on transformedStream1 or we get org.apache.spark.SparkException: org.apache.spark.streaming.dstream.MappedDStream@459bd6af has not been initialized)
transformedStream1.foreachRDD {rdd => {}}

// -----------------------------------
// Register actions to evaluate the formula
// volatile as those are read both from the foreachRDD below and the Prop.forall below
// - only foreachRDD writes to currFormula
// - DStreamTLProperty.forAllDStream reads currFormula
// thus using "the cheap read-write lock trick" https://www.ibm.com/developerworks/java/library/j-jtp06197/
@transient @volatile var currFormula : NextFormula[U] = {
val currFormulaLock = new Serializable{}
inputDStream1
.foreachRDD { (input1Batch, time) =>
// NOTE: batch cannot be completed until this code finishes, use
// future if needed to avoid blocking the batch completion
// FIXME: consider whether this synchronization is not already
// implicitly obtained by DStreamTLProperty.forAllDStream blocking until the batch is completed
currFormulaLock.synchronized {
if ((currFormula.result.isEmpty) && ! input1Batch.isEmpty) {
/* Note currFormula is reset to formulaNext for each test case, but for
* each test case currFormula only gets to solved state once.
* */
val trans1Batch = transformedStream1.slice(time, time).head
currFormula = currFormula.consume((input1Batch, trans1Batch))
}
}
}
// this block is just to avoid adding unnecessary fields
// to this object, as this is part of the constructor
formulaNext
}

// -----------------------------------
// Synchronization stuff
// won't wait for each batch for more than batchCompletionTimeout milliseconds
@transient private val batchInterval = inputDStream1.slideDuration.milliseconds
@transient private val batchCompletionTimeout = batchInterval * 1000 // give a good margin, values like 5 lead to spurious errors
// the worker thread uses the SyncVar with a registered addStreamingListener
// that notifies onBatchCompleted.
// Note: no need to wait for receiver, as there is no receiver
@transient private val onBatchCompletedSyncVar = new SyncVar[Unit]
ssc.addStreamingListener(new StreamingListener {
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) : Unit = {
// signal the property about the completion of a new batch
if (! onBatchCompletedSyncVar.isSet) {
// note only this threads makes puts, so no problem with concurrency
onBatchCompletedSyncVar.put(())
}
}
})

// -----------------------------------
// now that everything is ready we start the streaming context
ssc.start()
started = true

def waitForBatch() : Unit = {
Try {
onBatchCompletedSyncVar.take(batchCompletionTimeout)
} match {
case Success(_) => {}
case Failure(_) => {
val e = TestCaseTimeoutException(batchInterval= batchInterval,
batchCompletionTimeout = batchCompletionTimeout)
logger.error(e.getMessage) // FIXME should be private
Try { ssc.stop(stopSparkContext = false, stopGracefully = false) }
// This exception will make the test case fail, in this case the
// failing test case is not important as this is a performance problem, not
// a counterexample that has been found
throw e
}
}
}

/** Stops the internal streaming context, if it is running
*
* TODO: consider moving this to DStreamTLProperty
* */
def stop() : Unit = {
if (started) {
Try {
logger.warn("stopping test Spark Streaming context")
ssc.stop(stopSparkContext = false, stopGracefully = true)
started = false
} recover {
case _ => {
logger.warn("second attempt forcing stop of test Spark Streaming context")
ssc.stop(stopSparkContext=false, stopGracefully=false)
started = false
}
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.dstream.DStream

import es.ucm.fdi.sscheck.spark.streaming.SharedStreamingContextBeforeAfterEach
import es.ucm.fdi.sscheck.prop.tl.{Formula,DStreamProp}
import es.ucm.fdi.sscheck.prop.tl.{Formula,DStreamTLProperty}
import es.ucm.fdi.sscheck.prop.tl.Formula._
import es.ucm.fdi.sscheck.gen.{PDStreamGen,BatchGen}

@RunWith(classOf[JUnitRunner])
class StreamingFormulaDemo1
extends Specification
with SharedStreamingContextBeforeAfterEach
with DStreamTLProperty
with ResultMatchers
with ScalaCheck {

Expand Down Expand Up @@ -49,7 +49,7 @@ class StreamingFormulaDemo1
} during numBatches
val gen = BatchGen.always(BatchGen.ofN(50, arbitrary[Double]), numBatches)

DStreamProp.forAll(
forAllDStream(
gen)(
testSubject)(
formula)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import org.apache.spark.streaming.dstream.DStream._
import scalaz.syntax.std.boolean._

import es.ucm.fdi.sscheck.spark.streaming.SharedStreamingContextBeforeAfterEach
import es.ucm.fdi.sscheck.prop.tl.{Formula,DStreamProp}
import es.ucm.fdi.sscheck.prop.tl.{Formula,DStreamTLProperty}
import es.ucm.fdi.sscheck.prop.tl.Formula._
import es.ucm.fdi.sscheck.gen.{PDStreamGen,BatchGen}
import es.ucm.fdi.sscheck.gen.BatchGenConversions._
Expand All @@ -26,7 +26,7 @@ import es.ucm.fdi.sscheck.matcher.specs2.RDDMatchers._
@RunWith(classOf[JUnitRunner])
class StreamingFormulaDemo2
extends Specification
with SharedStreamingContextBeforeAfterEach
with DStreamTLProperty
with ResultMatchers
with ScalaCheck {

Expand Down Expand Up @@ -77,7 +77,7 @@ class StreamingFormulaDemo2
( always { badInput ==> (always(badIdBanned) during nestedTimeout) } during tailTimeout )
}

DStreamProp.forAll(
forAllDStream(
gen)(
testSubject)(
formula)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Duration}
import org.apache.spark.streaming.dstream.DStream

import es.ucm.fdi.sscheck.prop.tl.DStreamProp
import es.ucm.fdi.sscheck.prop.tl.Formula._
import es.ucm.fdi.sscheck.prop.tl.DStreamTLProperty
import es.ucm.fdi.sscheck.matcher.specs2.RDDMatchers._

@RunWith(classOf[JUnitRunner])
class ScalaCheckStreamingTest
extends org.specs2.Specification
with SharedStreamingContextBeforeAfterEach
with DStreamTLProperty
with org.specs2.matcher.ResultMatchers
with ScalaCheck {

Expand All @@ -46,7 +46,7 @@ class ScalaCheckStreamingTest
}
type U = (RDD[Int], RDD[Int])

DStreamProp.forAll(
forAllDStream(
"inputDStream" |: dsgenSeqSeq1)(
(inputDs : DStream[Int]) => {
val transformedDs = inputDs.map(_+1)
Expand All @@ -68,7 +68,7 @@ class ScalaCheckStreamingTest
def countProp(testSubject : DStream[Double] => DStream[Long]) = {
type U = (RDD[Double], RDD[Long])
val numBatches = 10
DStreamProp.forAll(
forAllDStream(
Gen.listOfN(numBatches, Gen.listOfN(30, arbitrary[Double])))(
testSubject
)(always ((u : U) => {
Expand Down

0 comments on commit 25433de

Please sign in to comment.