Skip to content

Commit

Permalink
Update PR in response to comments.
Browse files Browse the repository at this point in the history
This PR does a few things:

 * Renames Stream/StreamT to Streaming/StreamingT
 * Sets up some law-checking for Stream and StreamingT
 * Adds some type class instances
 * Fixes Stream#filter bug

One issue which was pointed out in the PR is that some
methods on StreamingT[F, ?] require F to be trampolined.
For types like Option which are not trampolined, this
will result in StackOverflowExceptions (and thus failed
law-checking) during some operations.

I don't have a 100% satisfactory answer to this right
now. I'd love to come up with a design that is completely
safe but don't see how it can be done in general.

Still left to do:

 * Update documentation
 * Comprehensive type class definitions for Streaming/StreamingT.
 * Do some benchmarking on Streaming/StreamingT overhead.
 * More unit tests for Streaming/StreamingT
  • Loading branch information
non committed Aug 20, 2015
1 parent 95a3852 commit 8f7a110
Show file tree
Hide file tree
Showing 9 changed files with 258 additions and 107 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ import scala.reflect.ClassTag
import scala.annotation.tailrec
import scala.collection.mutable

sealed abstract class StreamT[F[_], A] { lhs =>
sealed abstract class StreamingT[F[_], A] { lhs =>

import StreamT.{Empty, Next, This}
import StreamingT.{Empty, Next, This}

/**
* Deconstruct a stream into a head and tail (if available).
*
* This method will evaluate the stream until it finds a head and
* tail, or until the stream is exhausted.
*/
def uncons(implicit ev: Monad[F]): F[Option[(A, StreamT[F, A])]] =
def uncons(implicit ev: Monad[F]): F[Option[(A, StreamingT[F, A])]] =
this match {
case Empty() => ev.pure(None)
case Next(ft) => ft.flatMap(_.uncons)
Expand All @@ -28,7 +28,7 @@ sealed abstract class StreamT[F[_], A] { lhs =>
/**
* Lazily transform the stream given a function `f`.
*/
def map[B](f: A => B)(implicit ev: Functor[F]): StreamT[F, B] =
def map[B](f: A => B)(implicit ev: Functor[F]): StreamingT[F, B] =
this match {
case Empty() => Empty()
case Next(ft) => Next(ft.map(_.map(f)))
Expand All @@ -38,7 +38,7 @@ sealed abstract class StreamT[F[_], A] { lhs =>
/**
* Lazily transform the stream given a function `f`.
*/
def flatMap[B](f: A => StreamT[F, B])(implicit ev: Functor[F]): StreamT[F, B] =
def flatMap[B](f: A => StreamingT[F, B])(implicit ev: Functor[F]): StreamingT[F, B] =
this match {
case Empty() => Empty()
case Next(ft) => Next(ft.map(_.flatMap(f)))
Expand All @@ -48,7 +48,7 @@ sealed abstract class StreamT[F[_], A] { lhs =>
/**
* Lazily filter the stream given the predicate `f`.
*/
def filter(f: A => Boolean)(implicit ev: Functor[F]): StreamT[F, A] =
def filter(f: A => Boolean)(implicit ev: Functor[F]): StreamingT[F, A] =
this match {
case Empty() => this
case Next(ft) => Next(ft.map(_.filter(f)))
Expand Down Expand Up @@ -86,7 +86,7 @@ sealed abstract class StreamT[F[_], A] { lhs =>
/**
* Lazily concatenate two streams.
*/
def concat(rhs: StreamT[F, A])(implicit ev: Functor[F]): StreamT[F, A] =
def concat(rhs: StreamingT[F, A])(implicit ev: Functor[F]): StreamingT[F, A] =
this match {
case Empty() => rhs
case Next(ft) => Next(ft.map(_ concat rhs))
Expand All @@ -98,7 +98,7 @@ sealed abstract class StreamT[F[_], A] { lhs =>
*
* In this case the evaluation of the second stream may be deferred.
*/
def concat(rhs: F[StreamT[F, A]])(implicit ev: Functor[F]): StreamT[F, A] =
def concat(rhs: F[StreamingT[F, A]])(implicit ev: Functor[F]): StreamingT[F, A] =
this match {
case Empty() => Next(rhs)
case Next(ft) => Next(ft.map(_ concat rhs))
Expand All @@ -111,7 +111,7 @@ sealed abstract class StreamT[F[_], A] { lhs =>
* The lenght of the result will be the shorter of the two
* arguments.
*/
def zip[B](rhs: StreamT[F, B])(implicit ev: Monad[F]): StreamT[F, (A, B)] =
def zip[B](rhs: StreamingT[F, B])(implicit ev: Monad[F]): StreamingT[F, (A, B)] =
Next(for {
lo <- lhs.uncons; ro <- rhs.uncons
} yield (lo, ro) match {
Expand All @@ -127,7 +127,7 @@ sealed abstract class StreamT[F[_], A] { lhs =>
* Unlike `zip`, the length of the result will be the longer of the
* two arguments.
*/
def izip[B](rhs: StreamT[F, B])(implicit ev: Monad[F]): StreamT[F, Ior[A, B]] =
def izip[B](rhs: StreamingT[F, B])(implicit ev: Monad[F]): StreamingT[F, Ior[A, B]] =
Next(for {
lo <- lhs.uncons; ro <- rhs.uncons
} yield (lo, ro) match {
Expand Down Expand Up @@ -170,7 +170,7 @@ sealed abstract class StreamT[F[_], A] { lhs =>
* If the current stream has `n` or fewer elements, the entire
* stream will be returned.
*/
def take(n: Int)(implicit ev: Functor[F]): StreamT[F, A] =
def take(n: Int)(implicit ev: Functor[F]): StreamingT[F, A] =
if (n <= 0) Empty() else this match {
case Empty() => Empty()
case Next(ft) => Next(ft.map(_.take(n)))
Expand All @@ -184,7 +184,7 @@ sealed abstract class StreamT[F[_], A] { lhs =>
* If the current stream has `n` or fewer elements, an empty stream
* will be returned.
*/
def drop(n: Int)(implicit ev: Functor[F]): StreamT[F, A] =
def drop(n: Int)(implicit ev: Functor[F]): StreamingT[F, A] =
if (n <= 0) this else this match {
case Empty() => Empty()
case Next(ft) => Next(ft.map(_.drop(n)))
Expand All @@ -206,7 +206,7 @@ sealed abstract class StreamT[F[_], A] { lhs =>
*
* Will result in: Stream(1, 2, 3)
*/
def takeWhile(f: A => Boolean)(implicit ev: Functor[F]): StreamT[F, A] =
def takeWhile(f: A => Boolean)(implicit ev: Functor[F]): StreamingT[F, A] =
this match {
case Empty() => Empty()
case Next(ft) => Next(ft.map(_.takeWhile(f)))
Expand All @@ -228,7 +228,7 @@ sealed abstract class StreamT[F[_], A] { lhs =>
*
* Will result in: Stream(4, 5, 6, 7)
*/
def dropWhile(f: A => Boolean)(implicit ev: Functor[F]): StreamT[F, A] =
def dropWhile(f: A => Boolean)(implicit ev: Functor[F]): StreamingT[F, A] =
this match {
case Empty() => Empty()
case Next(ft) => Next(ft.map(_.dropWhile(f)))
Expand Down Expand Up @@ -265,7 +265,7 @@ sealed abstract class StreamT[F[_], A] { lhs =>
}
}

object StreamT {
object StreamingT {

/**
* Concrete Stream[A] types:
Expand All @@ -279,23 +279,23 @@ object StreamT {
* and Always). The head of `This` is eager -- a lazy head can be
* represented using `Next(Always(...))` or `Next(Later(...))`.
*/
case class Empty[F[_], A]() extends StreamT[F, A]
case class Next[F[_], A](next: F[StreamT[F, A]]) extends StreamT[F, A]
case class This[F[_], A](a: A, tail: F[StreamT[F, A]]) extends StreamT[F, A]
case class Empty[F[_], A]() extends StreamingT[F, A]
case class Next[F[_], A](next: F[StreamingT[F, A]]) extends StreamingT[F, A]
case class This[F[_], A](a: A, tail: F[StreamingT[F, A]]) extends StreamingT[F, A]

/**
* Create an empty stream of type A.
*/
def empty[F[_], A]: StreamT[F, A] =
def empty[F[_], A]: StreamingT[F, A] =
Empty()

/**
* Create a stream consisting of a single value.
*/
def apply[F[_], A](a: A)(implicit ev: Applicative[F]): StreamT[F, A] =
def apply[F[_], A](a: A)(implicit ev: Applicative[F]): StreamingT[F, A] =
This(a, ev.pure(Empty()))

def cons[F[_], A](a: A, fs: F[StreamT[F, A]]): StreamT[F, A] =
def cons[F[_], A](a: A, fs: F[StreamingT[F, A]]): StreamingT[F, A] =
This(a, fs)

/**
Expand All @@ -304,9 +304,17 @@ object StreamT {
* None represents an empty stream. Some(a) reprsents an initial
* element, and we can compute the tail (if any) via f(a).
*/
def unfold[F[_], A](o: Option[A])(f: A => F[Option[A]])(implicit ev: Functor[F]): StreamT[F, A] =
def unfold[F[_], A](o: Option[A])(f: A => F[Option[A]])(implicit ev: Functor[F]): StreamingT[F, A] =
o match {
case None => Empty()
case Some(a) => This(a, f(a).map(o => unfold(o)(f)))
}

implicit def streamTMonad[F[_]: Applicative]: Monad[StreamingT[F, ?]] =
new Monad[StreamingT[F, ?]] {
def pure[A](a: A): StreamingT[F, A] =
StreamingT(a)
def flatMap[A, B](fa: StreamingT[F, A])(f: A => StreamingT[F, B]): StreamingT[F, B] =
fa.flatMap(f)
}
}
5 changes: 4 additions & 1 deletion laws/shared/src/main/scala/cats/laws/MonadFilterLaws.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package cats
package laws

import cats.syntax.flatMap._
import cats.syntax.all._

/**
* Laws that must be obeyed by any `MonadFilter`.
Expand All @@ -14,6 +14,9 @@ trait MonadFilterLaws[F[_]] extends MonadLaws[F] {

def monadFilterRightEmpty[A, B](fa: F[A]): IsEq[F[B]] =
fa.flatMap(_ => F.empty[B]) <-> F.empty[B]

def monadFilterConsistency[A, B](fa: F[A], f: A => Boolean): IsEq[F[A]] =
fa.filter(f) <-> fa.flatMap(a => if (f(a)) F.pure(a) else F.empty)
}

object MonadFilterLaws {
Expand Down
10 changes: 10 additions & 0 deletions laws/shared/src/main/scala/cats/laws/discipline/Arbitrary.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,14 @@ object arbitrary {
A.arbitrary.map(a => Eval.now(a)),
A.arbitrary.map(a => Eval.later(a)),
A.arbitrary.map(a => Eval.always(a))))

import cats.data.{Streaming, StreamingT}

implicit def streamingArbitrary[A](implicit A: Arbitrary[A]): Arbitrary[Streaming[A]] =
Arbitrary(Gen.listOf(A.arbitrary).map(Streaming.fromList))

implicit def streamKArbitrary[F[_], A](implicit F: Monad[F], A: Arbitrary[A]): Arbitrary[StreamingT[F, A]] =
Arbitrary(for {
as <- Gen.listOf(A.arbitrary)
} yield as.foldLeft(StreamingT.empty[F, A])((s, a) => StreamingT.This(a, F.pure(s))))
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,11 @@ object ArbitraryK {

implicit def optionT[F[_]](implicit F: ArbitraryK[F]): ArbitraryK[OptionT[F, ?]] =
new ArbitraryK[OptionT[F, ?]] { def synthesize[A: Arbitrary]: Arbitrary[OptionT[F, A]] = implicitly }

import cats.data.{Streaming, StreamingT}
implicit val streaming: ArbitraryK[Streaming] =
new ArbitraryK[Streaming] { def synthesize[A: Arbitrary]: Arbitrary[Streaming[A]] = implicitly }

implicit def streamT[F[_]: Monad]: ArbitraryK[StreamingT[F, ?]] =
new ArbitraryK[StreamingT[F, ?]] { def synthesize[A: Arbitrary]: Arbitrary[StreamingT[F, A]] = implicitly }
}
9 changes: 9 additions & 0 deletions laws/shared/src/main/scala/cats/laws/discipline/Eq.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,13 @@ object eq {
}
}

import cats.data.StreamingT

implicit def streamTEq[F[_]: EqK: Monad, A: Eq]: Eq[StreamingT[F, A]] =
new Eq[StreamingT[F, A]] {
def eqv(lhs: StreamingT[F, A], rhs: StreamingT[F, A]): Boolean = {
val e = EqK[F].synthesize[List[A]](EqK[List].synthesize[A])
e.eqv(lhs.toList, rhs.toList)
}
}
}
10 changes: 10 additions & 0 deletions laws/shared/src/main/scala/cats/laws/discipline/EqK.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,14 @@ object EqK {

implicit val vector: EqK[Vector] =
new EqK[Vector] { def synthesize[A: Eq]: Eq[Vector[A]] = implicitly }

import cats.data.{Streaming, StreamingT}
implicit val streaming: EqK[Streaming] =
new EqK[Streaming] { def synthesize[A: Eq]: Eq[Streaming[A]] = implicitly }

implicit def streamT[F[_]: EqK: Monad]: EqK[StreamingT[F, ?]] =
new EqK[StreamingT[F, ?]] {
def synthesize[A: Eq]: Eq[StreamingT[F, A]] =
cats.laws.discipline.eq.streamTEq[F, A](EqK[F], Monad[F], Eq[A])
}
}
11 changes: 11 additions & 0 deletions tests/shared/src/test/scala/cats/tests/StreamingTTests.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package cats
package tests

import cats.data.StreamingT
import cats.laws.discipline.{EqK, MonadTests, SerializableTests}

class StreamingTTests extends CatsSuite {
implicit val e: Eq[StreamingT[Eval, Int]] = EqK[StreamingT[Eval, ?]].synthesize[Int]
checkAll("StreamingT[Eval, ?]", MonadTests[StreamingT[Eval, ?]].monad[Int, Int, Int])
checkAll("Monad[StreamingT[Eval, ?]]", SerializableTests.serializable(Monad[StreamingT[Eval, ?]]))
}
17 changes: 17 additions & 0 deletions tests/shared/src/test/scala/cats/tests/StreamingTests.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package cats
package tests

import cats.data.Streaming
import cats.laws.discipline.arbitrary._
import cats.laws.discipline.{TraverseTests, CoflatMapTests, MonadCombineTests, SerializableTests}

class StreamingTests extends CatsSuite {
checkAll("Streaming[Int]", CoflatMapTests[Streaming].coflatMap[Int, Int, Int])
checkAll("CoflatMap[Streaming]", SerializableTests.serializable(CoflatMap[Streaming]))

checkAll("Streaming[Int]", MonadCombineTests[Streaming].monadCombine[Int, Int, Int])
checkAll("MonadCombine[Streaming]", SerializableTests.serializable(MonadCombine[Streaming]))

checkAll("Streaming[Int] with Option", TraverseTests[Streaming].traverse[Int, Int, Int, Int, Option, Option])
checkAll("Traverse[Streaming]", SerializableTests.serializable(Traverse[Streaming]))
}

0 comments on commit 8f7a110

Please sign in to comment.