Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to cats-effect, fs2 3.0.0-M3 builds #80

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 21 additions & 75 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,48 +32,21 @@ jobs:
fetch-depth: 0

- name: Setup Java and Scala
uses: olafurpg/setup-scala@v5
uses: olafurpg/setup-scala@v10
with:
java-version: ${{ matrix.java }}

- name: Cache ivy2
uses: actions/cache@v1
with:
path: ~/.ivy2/cache
key: ${{ runner.os }}-sbt-ivy-cache-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }}

- name: Cache coursier (generic)
uses: actions/cache@v1
with:
path: ~/.coursier/cache/v1
key: ${{ runner.os }}-generic-sbt-coursier-cache-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }}

- name: Cache coursier (linux)
if: contains(runner.os, 'linux')
uses: actions/cache@v1
with:
path: ~/.cache/coursier/v1
key: ${{ runner.os }}-sbt-coursier-cache-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }}

- name: Cache coursier (macOS)
if: contains(runner.os, 'macos')
uses: actions/cache@v1
with:
path: ~/Library/Caches/Coursier/v1
key: ${{ runner.os }}-sbt-coursier-cache-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }}

- name: Cache coursier (windows)
if: contains(runner.os, 'windows')
uses: actions/cache@v1
with:
path: ~/AppData/Local/Coursier/Cache/v1
key: ${{ runner.os }}-sbt-coursier-cache-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }}

- name: Cache sbt
uses: actions/cache@v1
uses: actions/cache@v2
with:
path: ~/.sbt
key: ${{ runner.os }}-sbt-cache-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }}
path: |
~/.sbt
~/.ivy2/cache
~/.coursier/cache/v1
~/.cache/coursier/v1
~/AppData/Local/Coursier/Cache/v1
~/Library/Caches/Coursier/v1
key: ${{ runner.os }}-sbt-cache-v2-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }}

- name: Check that workflows are up to date
run: sbt ++${{ matrix.scala }} githubWorkflowCheck
Expand Down Expand Up @@ -107,48 +80,21 @@ jobs:
fetch-depth: 0

- name: Setup Java and Scala
uses: olafurpg/setup-scala@v5
uses: olafurpg/setup-scala@v10
with:
java-version: ${{ matrix.java }}

- name: Cache ivy2
uses: actions/cache@v1
with:
path: ~/.ivy2/cache
key: ${{ runner.os }}-sbt-ivy-cache-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }}

- name: Cache coursier (generic)
uses: actions/cache@v1
with:
path: ~/.coursier/cache/v1
key: ${{ runner.os }}-generic-sbt-coursier-cache-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }}

- name: Cache coursier (linux)
if: contains(runner.os, 'linux')
uses: actions/cache@v1
with:
path: ~/.cache/coursier/v1
key: ${{ runner.os }}-sbt-coursier-cache-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }}

- name: Cache coursier (macOS)
if: contains(runner.os, 'macos')
uses: actions/cache@v1
with:
path: ~/Library/Caches/Coursier/v1
key: ${{ runner.os }}-sbt-coursier-cache-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }}

- name: Cache coursier (windows)
if: contains(runner.os, 'windows')
uses: actions/cache@v1
with:
path: ~/AppData/Local/Coursier/Cache/v1
key: ${{ runner.os }}-sbt-coursier-cache-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }}

- name: Cache sbt
uses: actions/cache@v1
with:
path: ~/.sbt
key: ${{ runner.os }}-sbt-cache-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }}
uses: actions/cache@v2
with:
path: |
~/.sbt
~/.ivy2/cache
~/.coursier/cache/v1
~/.cache/coursier/v1
~/AppData/Local/Coursier/Cache/v1
~/Library/Caches/Coursier/v1
key: ${{ runner.os }}-sbt-cache-v2-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }}

- name: Download target directories (2.12.12)
uses: actions/download-artifact@v2
Expand Down
10 changes: 5 additions & 5 deletions project/Version.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
object Version {
val Akka = "2.6.9"
val Akka = "2.6.10"
val Camel = "2.20.4"
val CatsEffect = "2.2.0"
val Config = "1.4.0"
val Fs2 = "2.4.4"
val CatsEffect = "3.3.1"
val Fs2 = "3.2.4"
val JUnitInterface = "0.11"
val Log4j = "2.13.0"
val ScalaCollectionCompat = "2.2.0"
val Scalatest = "3.2.0"
val ScalaCollectionCompat = "2.3.0"
val Scalatest = "3.2.3"
val Silencer = "1.7.1"
}
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.4.0
sbt.version=1.4.3
4 changes: 2 additions & 2 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.6.0")

addSbtPlugin("org.scalariform" % "sbt-scalariform" % "1.8.3")

addSbtPlugin("io.github.davidgregory084" % "sbt-tpolecat" % "0.1.13")
addSbtPlugin("io.github.davidgregory084" % "sbt-tpolecat" % "0.1.15")

addSbtPlugin("com.codecommit" % "sbt-github-actions" % "0.9.3")
addSbtPlugin("com.codecommit" % "sbt-github-actions" % "0.9.5")

addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.9.21")

Expand Down
24 changes: 10 additions & 14 deletions streamz-camel-fs2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ Camel DSL for FS2
-----------------

[Apache Camel endpoints](http://camel.apache.org/components.html) can be integrated into [FS2](https://github.com/functional-streams-for-scala/fs2) applications with a [DSL](#dsl).

### Dependencies

The DSL is provided by the `streamz-camel-fs2` artifact which is available for Scala 2.11 and 2.12:

resolvers += Resolver.bintrayRepo("krasserm", "maven")

libraryDependencies += "com.github.krasserm" %% "streamz-camel-fs2" % "0.10-M2"

### Configuration

The consumer receive timeout on Camel endpoints defaults to 500 ms. If you need to change that, you can do so in `application.conf`:
Expand All @@ -31,11 +31,11 @@ Its usage requires an implicit [`StreamContext`](http://krasserm.github.io/strea
```scala
import streamz.camel.StreamContext

// contains an internally managed CamelContext
// contains an internally managed CamelContext
implicit val streamContext: StreamContext = StreamContext()
```

Applications that want to re-use an existing, externally managed `CamelContext` should create a `StreamContext` with `StreamContext(camelContext: CamelContext)`:
Applications that want to re-use an existing, externally managed `CamelContext` should create a `StreamContext` with `StreamContext(camelContext: CamelContext)`:

```scala
import org.apache.camel.CamelContext
Expand All @@ -49,7 +49,7 @@ implicit val streamContext: StreamContext = StreamContext(camelContext)
```
A `StreamContext` internally manages an `executorService` for running blocking endpoint operations. Applications can configure a custom executor service by providing an `executorServiceFactory` during `StreamContext` creation. See [API docs](http://krasserm.github.io/streamz/scala-2.12/unidoc/streamz/camel/StreamContext$.html) for details.

After usage, a `StreamContext` should be stopped with `streamContext.stop()`.
After usage, a `StreamContext` should be stopped with `streamContext.stop()`.

#### Receiving in-only message exchanges from an endpoint

Expand All @@ -58,8 +58,8 @@ An FS2 stream that emits messages consumed from a Camel endpoint can be created
```scala
import cats.effect.IO
import fs2.Stream
import streamz.camel.StreamContext
import streamz.camel.StreamMessage
import streamz.camel.StreamContext
import streamz.camel.StreamMessage
import streamz.camel.fs2.dsl._

val s1: Stream[IO, StreamMessage[String]] = receive[IO, String]("seda:q1")
Expand All @@ -73,7 +73,7 @@ val s1b: Stream[IO, String] = receiveBody[IO, String]("seda:q1")

This is equivalent to `receive[IO, String]("seda:q1").map(_.body)`.

`receive` and `receiveBody` can only be used with endpoints that create [in-only message exchanges](http://camel.apache.org/exchange-pattern.html).
`receive` and `receiveBody` can only be used with endpoints that create [in-only message exchanges](http://camel.apache.org/exchange-pattern.html).

#### Receiving in-out message exchanges from an endpoint

Expand All @@ -87,9 +87,7 @@ For sending a `StreamMessage` to a Camel endpoint, the `send` combinator should
val s2: Stream[IO, StreamMessage[String]] = s1.send("seda:q2")
```

This initiates an in-only message [exchange](http://camel.apache.org/exchange.html) with an endpoint and continues the stream with the sent `StreamMessage`.

The `send` combinator is not only available for streams of type `Stream[IO, StreamMessage[A]]` but more generally for any `Stream[F, A]` where `F: ContextShift: Async`.
This initiates an in-only message [exchange](http://camel.apache.org/exchange.html) with an endpoint and continues the stream with the sent `StreamMessage`.

```scala
val s2b: Stream[IO, String] = s1b.send("seda:q2")
Expand All @@ -105,9 +103,7 @@ For sending a request `StreamMessage` to an endpoint and obtaining a reply, the
val s3: Stream[IO, StreamMessage[Int]] = s2.sendRequest[Int]("bean:service?method=weight")
```

This initiates an in-out message exchange with the endpoint and continues the stream with the output `StreamMessage`. Here, a [Bean endpoint](https://camel.apache.org/bean.html) is used to call the `weight(String): Int` method on an object that is registered in the `CamelContext` under the name `service`. The input message body is used as `weight` call argument, the output message body is assigned the return value. The `sendRequest` type parameter (`Int`) specifies the expected output value type. The output message body can also be converted to another type provided that an appropriate Camel type converter is available (`Double`, for example).

The `sendRequest` combinator is not only available for streams of type `Stream[IO, StreamMessage[A]]` but more generally for any `Stream[F, A]` where `F: ContextShift: Async`.
This initiates an in-out message exchange with the endpoint and continues the stream with the output `StreamMessage`. Here, a [Bean endpoint](https://camel.apache.org/bean.html) is used to call the `weight(String): Int` method on an object that is registered in the `CamelContext` under the name `service`. The input message body is used as `weight` call argument, the output message body is assigned the return value. The `sendRequest` type parameter (`Int`) specifies the expected output value type. The output message body can also be converted to another type provided that an appropriate Camel type converter is available (`Double`, for example).

```scala
val s3b: Stream[IO, Int] = s2b.sendRequest[Int]("bean:service?method=weight")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ package streamz.camel.fs2

import java.util.concurrent.TimeUnit

import cats.effect.{ Async, ContextShift }
import cats.implicits._
import cats.effect.Async
import fs2._
import org.apache.camel.spi.Synchronization
import org.apache.camel.{ Exchange, ExchangePattern }
Expand All @@ -33,7 +32,7 @@ package object dsl {
/**
* Camel endpoint combinators for [[StreamMessage]] streams of type `Stream[F, StreamMessage[A]]`.
*/
implicit class SendDsl[F[_]: ContextShift: Async, A](self: Stream[F, StreamMessage[A]]) {
implicit class SendDsl[F[_]: Async, A](self: Stream[F, StreamMessage[A]]) {
/**
* @see [[dsl.send]]
*/
Expand All @@ -50,7 +49,7 @@ package object dsl {
/**
* Camel endpoint combinators for [[StreamMessage]] body streams of type `Stream[F, A]`.
*/
implicit class SendBodyDsl[F[_]: ContextShift: Async, A](self: Stream[F, A]) {
implicit class SendBodyDsl[F[_]: Async, A](self: Stream[F, A]) {
/**
* @see [[dsl.sendBody]]
*/
Expand All @@ -71,13 +70,13 @@ package object dsl {
/**
* @see [[dsl.send]]
*/
def send[F[_]](uri: String)(implicit context: StreamContext, contextShift: ContextShift[F], async: Async[F]): Stream[F, StreamMessage[A]] =
def send[F[_]](uri: String)(implicit context: StreamContext, async: Async[F]): Stream[F, StreamMessage[A]] =
new SendDsl[F, A](self.covary[F]).send(uri)

/**
* @see [[dsl.sendRequest()]]
*/
def sendRequest[F[_], B](uri: String)(implicit context: StreamContext, tag: ClassTag[B], contextShift: ContextShift[F], async: Async[F]): Stream[F, StreamMessage[B]] =
def sendRequest[F[_], B](uri: String)(implicit context: StreamContext, tag: ClassTag[B], async: Async[F]): Stream[F, StreamMessage[B]] =
new SendDsl[F, A](self.covary[F]).sendRequest(uri)
}

Expand All @@ -88,13 +87,13 @@ package object dsl {
/**
* @see [[dsl.sendBody]]
*/
def send[F[_]: ContextShift: Async](uri: String)(implicit context: StreamContext): Stream[F, A] =
def send[F[_]: Async](uri: String)(implicit context: StreamContext): Stream[F, A] =
new SendBodyDsl[F, A](self.covary[F]).send(uri)

/**
* @see [[dsl.sendRequestBody]]
*/
def sendRequest[F[_]: ContextShift: Async, B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Stream[F, B] =
def sendRequest[F[_]: Async, B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Stream[F, B] =
new SendBodyDsl[F, A](self.covary[F]).sendRequest(uri)
}

Expand All @@ -110,7 +109,7 @@ package object dsl {
* @param uri Camel endpoint URI.
* @throws org.apache.camel.TypeConversionException if type conversion fails.
*/
def receive[F[_]: ContextShift: Async, A](uri: String)(implicit context: StreamContext, tag: ClassTag[A]): Stream[F, StreamMessage[A]] = {
def receive[F[_]: Async, A](uri: String)(implicit context: StreamContext, tag: ClassTag[A]): Stream[F, StreamMessage[A]] = {
consume(uri).filter(_ != null)
}

Expand All @@ -126,7 +125,7 @@ package object dsl {
* @param uri Camel endpoint URI.
* @throws org.apache.camel.TypeConversionException if type conversion fails.
*/
def receiveBody[F[_]: ContextShift: Async, A](uri: String)(implicit context: StreamContext, tag: ClassTag[A]): Stream[F, A] =
def receiveBody[F[_]: Async, A](uri: String)(implicit context: StreamContext, tag: ClassTag[A]): Stream[F, A] =
receive(uri).map(_.body)

/**
Expand All @@ -136,7 +135,7 @@ package object dsl {
*
* @param uri Camel endpoint URI.
*/
def send[F[_]: ContextShift: Async, A](uri: String)(implicit context: StreamContext): Pipe[F, StreamMessage[A], StreamMessage[A]] =
def send[F[_]: Async, A](uri: String)(implicit context: StreamContext): Pipe[F, StreamMessage[A], StreamMessage[A]] =
produce[F, A, A](uri, ExchangePattern.InOnly, (message, _) => message)

/**
Expand All @@ -146,7 +145,7 @@ package object dsl {
*
* @param uri Camel endpoint URI.
*/
def sendBody[F[_]: ContextShift: Async, A](uri: String)(implicit context: StreamContext): Pipe[F, A, A] =
def sendBody[F[_]: Async, A](uri: String)(implicit context: StreamContext): Pipe[F, A, A] =
s => s.map(StreamMessage(_)).through(send(uri)).map(_.body)

/**
Expand All @@ -158,7 +157,7 @@ package object dsl {
* @param uri Camel endpoint URI.
* @throws org.apache.camel.TypeConversionException if type conversion fails.
*/
def sendRequest[F[_]: ContextShift: Async, A, B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Pipe[F, StreamMessage[A], StreamMessage[B]] =
def sendRequest[F[_]: Async, A, B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Pipe[F, StreamMessage[A], StreamMessage[B]] =
produce[F, A, B](uri, ExchangePattern.InOut, (_, exchange) => StreamMessage.from[B](exchange.getOut))

/**
Expand All @@ -170,13 +169,13 @@ package object dsl {
* @param uri Camel endpoint URI.
* @throws org.apache.camel.TypeConversionException if type conversion fails.
*/
def sendRequestBody[F[_]: ContextShift: Async, A, B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Pipe[F, A, B] =
def sendRequestBody[F[_]: Async, A, B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Pipe[F, A, B] =
s => s.map(StreamMessage(_)).through(sendRequest[F, A, B](uri)).map(_.body)

private def consume[F[_], A](uri: String)(implicit context: StreamContext, tag: ClassTag[A], contextShift: ContextShift[F], F: Async[F]): Stream[F, StreamMessage[A]] = {
private def consume[F[_], A](uri: String)(implicit context: StreamContext, tag: ClassTag[A], F: Async[F]): Stream[F, StreamMessage[A]] = {
val timeout = context.config.getDuration("streamz.camel.consumer.receive.timeout", TimeUnit.MILLISECONDS)
Stream.repeatEval {
contextShift.shift >> F.async[StreamMessage[A]] { callback =>
F.async_[StreamMessage[A]] { callback =>
Try(context.consumerTemplate.receive(uri, timeout)) match {
case Success(null) =>
callback(Right(null))
Expand All @@ -199,10 +198,10 @@ package object dsl {
}
}

private def produce[F[_], A, B](uri: String, pattern: ExchangePattern, result: (StreamMessage[A], Exchange) => StreamMessage[B])(implicit context: StreamContext, contextShift: ContextShift[F], F: Async[F]): Pipe[F, StreamMessage[A], StreamMessage[B]] = { s =>
private def produce[F[_], A, B](uri: String, pattern: ExchangePattern, result: (StreamMessage[A], Exchange) => StreamMessage[B])(implicit context: StreamContext, F: Async[F]): Pipe[F, StreamMessage[A], StreamMessage[B]] = { s =>
s.flatMap { message =>
Stream.eval {
contextShift.shift >> F.async[StreamMessage[B]] { callback =>
F.async_[StreamMessage[B]] { callback =>
context.producerTemplate.asyncCallback(uri, context.createExchange(message, pattern), new Synchronization {
override def onFailure(exchange: Exchange): Unit =
callback(Left(exchange.getException))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

class DslSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll {
import cats.effect.unsafe.implicits.global

val camelRegistry = new SimpleRegistry
val camelContext = new DefaultCamelContext()

camelContext.setRegistry(camelRegistry)
camelRegistry.put("service", new Service)

implicit val streamContext = new StreamContext(camelContext)
implicit val contextShift = IO.contextShift(scala.concurrent.ExecutionContext.global)

import streamContext._

Expand Down
3 changes: 2 additions & 1 deletion streamz-converter/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ name := "streamz-converter"

libraryDependencies ++= Seq(
"co.fs2" %% "fs2-core" % Version.Fs2,
"org.typelevel" %% "cats-effect" % Version.CatsEffect,
"org.typelevel" %% "cats-effect-std" % Version.CatsEffect,
"org.typelevel" %% "cats-effect" % Version.CatsEffect % "test",
"com.typesafe.akka" %% "akka-stream" % Version.Akka,
"com.typesafe.akka" %% "akka-stream-testkit" % Version.Akka % "test",
"com.typesafe.akka" %% "akka-testkit" % Version.Akka % "test",
Expand Down
Loading