Skip to content
Juan Rodriguez Hortala edited this page Apr 19, 2017 · 24 revisions

Setup

sscheck has been tested with Scala 2.10.6 and 2.11.8, and Spark 1.6.2.

See the scaladoc

Example properties:

Maven dependency

The artifact is hosted at bintray. For using it from maven, see the instructions in that link. For usage with sbt 0.13.6+ add the following to your build.sbt:

lazy val sscheckVersion = "0.3.2"
libraryDependencies += "es.ucm.fdi" %% "sscheck" % sscheckVersion
resolvers += Resolver.bintrayRepo("juanrh", "maven")

for a suitable value for sscheckVersion. Also you can take a look to https://github.com/juanrh/sscheck-examples for an example project that uses sscheck.

sscheck is also indexed in Spark Packages.

Using ScalaCheck with Spark Streaming

With our library you can use a variant of linear temporal logic to specify ScalaCheck properties of Spark Streaming programs. The simplest thing you can do is requiring a predicate to hold for all the batches of the generated test case DStreams, like in this property:

// extend the trait DStreamTLProperty and a Specs2 specification trait 
// to use forAllDStream for defining a Specs2 example as a temporal property
def countForallAlwaysProp(testSubject : DStream[Double] => DStream[Long]) = {
    type U = (RDD[Double], RDD[Long])
    val (inBatch, transBatch) = ((_ : U)._1, (_ : U)._2)
    val numBatches = 10 
    val formula : Formula[U] = always { (u : U) =>
      transBatch(u).count === 1 and
      inBatch(u).count === transBatch(u).first 
    } during numBatches
    val gen = BatchGen.always(BatchGen.ofNtoM(10, 50, arbitrary[Double]), numBatches)
    
    forAllDStream(
      gen)(
      testSubject)(
      formula)
  }   

But things get more interesting when you use temporal operators to express dependencies between events. In this other property we generate random DStreams of pairs (userId, boolean) where the boolean value is false if the user has performed a malicious action at that moment. The property specifies a transformation of that input DStream into an output stream containing the user ids of banned users, which have been malicious at some previous moment in time. For that we use:

  • a generator that generates good batches, where no malicious behaviour has happened, until a bad batch for a particular malicious id occurs. After that we generate either good or bad batches.
  • a property that states:
    1. we always get good inputs, until we ban the malicious id
    2. each time we find the malicious id, it is banned for some time
def checkExtractBannedUsersList(testSubject : DStream[(UserId, Boolean)] => DStream[UserId]) = {
    val batchSize = 20 
    val (headTimeout, tailTimeout, nestedTimeout) = (10, 10, 5) 
    val (badId, ids) = (15L, Gen.choose(1L, 50L))   
    val goodBatch = BatchGen.ofN(batchSize, ids.map((_, true)))
    val badBatch = goodBatch + BatchGen.ofN(1, (badId, false))
    val gen = BatchGen.until(goodBatch, badBatch, headTimeout) ++ 
               BatchGen.always(Gen.oneOf(goodBatch, badBatch), tailTimeout)
    
    type U = (RDD[(UserId, Boolean)], RDD[UserId])
    val (inBatch, outBatch) = ((_ : U)._1, (_ : U)._2)
    
    val formula : Formula[U] = {
      val badInput : Formula[U] = at(inBatch)(_ should existsRecord(_ == (badId, false)))
      val allGoodInputs : Formula[U] = at(inBatch)(_ should foreachRecord(_._2 == true))
      val badIdBanned : Formula[U] = at(outBatch)(_ should existsRecord(_ == badId))
      
      ( allGoodInputs until badIdBanned on headTimeout ) and
      ( always { badInput ==> (always(badIdBanned) during nestedTimeout) } during tailTimeout )  
    }  
    
    forAllDStream(gen)(testSubject)(formula)
  }

Instead of using actual clock time, like it is done for example in the future matchers in specs2, we consider the logical time defined by the batch interval. Each discrete time instant corresponds to the time from the start to the end of a batch interval, where for each DStream we can see the RDD at that instant, as it was computed instantaneously. In practice that computation is not instantaneous, but the synchronization performed by Spark Streaming makes it seem like that, at least when enough computing resources are available, and task scheduling is not delayed too much. This way a timeout in a generator or in a property specifies a number of batches to wait for something to happen.

As the test cases are finite (otherwise ScalaCheck wouldn't be able to complete the execution of the tests!), timeouts can be specified both in the data generators and in the formulas, to ensure events occur in a limited time. Other references:

ScalaMAD meetup sscheck video

Using ScalaCheck with Spark core

Example property:

def forallRDDGenOfNFreqMean = {
  val freqs = Map(1 -> 0, 4 -> 1)
  val rddSize = 300
  val gRDDFreq = RDDGen.ofN(rddSize, Gen.frequency(freqs.mapValues(Gen.const(_)).toSeq:_*))
  val expectedMean = {
  val freqS = freqs.toSeq
    val num = freqS .map({case (f, v) => v * f}). sum
    val den = freqS .map(_._1). sum
    num / den.toDouble
  }  
  Prop.forAll("rdd" |: gRDDFreq){ rdd : RDD[Int] =>
    rdd.mean must be ~(expectedMean +/- 0.1) 
  }
}. set(minTestsOk = 50).verbose 

More details in this blog post

Related references