diff --git a/couchbase/src/main/scala/akka/stream/alpakka/couchbase/impl/SetupStage.scala b/couchbase/src/main/scala/akka/stream/alpakka/couchbase/impl/SetupStage.scala deleted file mode 100644 index b94abdf40e..0000000000 --- a/couchbase/src/main/scala/akka/stream/alpakka/couchbase/impl/SetupStage.scala +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Copyright (C) 2016-2019 Lightbend Inc. - */ - -package akka.stream.alpakka.couchbase.impl - -import akka.annotation.InternalApi -import akka.stream._ -import akka.stream.scaladsl.{Flow, Keep, Sink, Source} -import akka.stream.stage._ - -import scala.concurrent.{Future, Promise} - -/** - * Internal API. - */ -@InternalApi private final class SetupFlowStage[T, U, M](factory: ActorMaterializer => Attributes => Flow[T, U, M]) - extends GraphStageWithMaterializedValue[FlowShape[T, U], Future[M]] { - - private val in = Inlet[T]("SetupFlowStage.in") - private val out = Outlet[U]("SetupFlowStage.out") - override val shape = FlowShape(in, out) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { - val matPromise = Promise[M] - (createStageLogic(matPromise), matPromise.future) - } - - private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) { - import SetupStage._ - - val subInlet = new SubSinkInlet[U]("SetupFlowStage") - val subOutlet = new SubSourceOutlet[T]("SetupFlowStage") - - subInlet.setHandler(delegateToOutlet(push(out, _: U), () => complete(out), fail(out, _), subInlet)) - subOutlet.setHandler(delegateToInlet(() => pull(in), () => cancel(in))) - - setHandler(in, delegateToSubOutlet(() => grab(in), subOutlet)) - setHandler(out, delegateToSubInlet(subInlet)) - - override def preStart(): Unit = { - val flow = factory(actorMaterializer(materializer))(attributes) - - val mat = Source - .fromGraph(subOutlet.source) - .viaMat(flow)(Keep.right) - .to(Sink.fromGraph(subInlet.sink)) - .run()(subFusingMaterializer) - matPromise.success(mat) - } - } -} - -/** - * Internal API. - */ -@InternalApi private final class SetupSourceStage[T, M](factory: ActorMaterializer => Attributes => Source[T, M]) - extends GraphStageWithMaterializedValue[SourceShape[T], Future[M]] { - - private val out = Outlet[T]("SetupSourceStage.out") - override val shape = SourceShape(out) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { - val matPromise = Promise[M] - (createStageLogic(matPromise), matPromise.future) - } - - private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) { - import SetupStage._ - - val subInlet = new SubSinkInlet[T]("SetupSourceStage") - subInlet.setHandler(delegateToOutlet(push(out, _: T), () => complete(out), fail(out, _), subInlet)) - setHandler(out, delegateToSubInlet(subInlet)) - - override def preStart(): Unit = { - val source = factory(actorMaterializer(materializer))(attributes) - - val mat = source - .to(Sink.fromGraph(subInlet.sink)) - .run()(subFusingMaterializer) - matPromise.success(mat) - } - } -} - -private object SetupStage { - def delegateToSubOutlet[T](grab: () => T, subOutlet: GraphStageLogic#SubSourceOutlet[T]) = new InHandler { - override def onPush(): Unit = - subOutlet.push(grab()) - override def onUpstreamFinish(): Unit = - subOutlet.complete() - override def onUpstreamFailure(ex: Throwable): Unit = - subOutlet.fail(ex) - } - - def delegateToOutlet[T](push: T => Unit, - complete: () => Unit, - fail: Throwable => Unit, - subInlet: GraphStageLogic#SubSinkInlet[T]) = new InHandler { - override def onPush(): Unit = - push(subInlet.grab()) - override def onUpstreamFinish(): Unit = - complete() - override def onUpstreamFailure(ex: Throwable): Unit = - fail(ex) - } - - def delegateToSubInlet[T](subInlet: GraphStageLogic#SubSinkInlet[T]) = new OutHandler { - override def onPull(): Unit = - subInlet.pull() - override def onDownstreamFinish(): Unit = - subInlet.cancel() - } - - def delegateToInlet(pull: () => Unit, cancel: () => Unit) = new OutHandler { - override def onPull(): Unit = - pull() - override def onDownstreamFinish(): Unit = - cancel() - } - - def actorMaterializer(mat: Materializer): ActorMaterializer = mat match { - case am: ActorMaterializer => am - case _ => throw new Error("ActorMaterializer required") - } -} - -/** - * Internal API. - */ -@InternalApi private[couchbase] object Setup { - - def flow[T, U, M](factory: ActorMaterializer => Attributes => Flow[T, U, M]): Flow[T, U, Future[M]] = - Flow.fromGraph(new SetupFlowStage(factory)) - - def source[T, M](factory: ActorMaterializer => Attributes => Source[T, M]): Source[T, Future[M]] = - Source.fromGraph(new SetupSourceStage(factory)) -} diff --git a/couchbase/src/main/scala/akka/stream/alpakka/couchbase/scaladsl/CouchbaseFlow.scala b/couchbase/src/main/scala/akka/stream/alpakka/couchbase/scaladsl/CouchbaseFlow.scala index b9b4cb5515..6575011795 100644 --- a/couchbase/src/main/scala/akka/stream/alpakka/couchbase/scaladsl/CouchbaseFlow.scala +++ b/couchbase/src/main/scala/akka/stream/alpakka/couchbase/scaladsl/CouchbaseFlow.scala @@ -4,7 +4,6 @@ package akka.stream.alpakka.couchbase.scaladsl import akka.NotUsed -import akka.stream.alpakka.couchbase.impl.Setup import akka.stream.alpakka.couchbase._ import akka.stream.scaladsl.Flow import com.couchbase.client.java.document.{Document, JsonDocument} @@ -18,12 +17,13 @@ object CouchbaseFlow { * Create a flow to query Couchbase for by `id` and emit [[com.couchbase.client.java.document.JsonDocument JsonDocument]]s. */ def fromId(sessionSettings: CouchbaseSessionSettings, bucketName: String): Flow[String, JsonDocument, NotUsed] = - Setup - .flow { materializer => _ => - val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName) - Flow[String] - .mapAsync(1)(id => session.flatMap(_.get(id /* timeout? */ ))(materializer.system.dispatcher)) - .collect { case Some(doc) => doc } + Flow + .setup { + case (materializer, _) => + val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName) + Flow[String] + .mapAsync(1)(id => session.flatMap(_.get(id /* timeout? */ ))(materializer.system.dispatcher)) + .collect { case Some(doc) => doc } } .mapMaterializedValue(_ => NotUsed) @@ -33,12 +33,13 @@ object CouchbaseFlow { def fromId[T <: Document[_]](sessionSettings: CouchbaseSessionSettings, bucketName: String, target: Class[T]): Flow[String, T, NotUsed] = - Setup - .flow { materializer => _ => - val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName) - Flow[String] - .mapAsync(1)(id => session.flatMap(_.get(id /* timeout? */, target))(materializer.system.dispatcher)) - .collect { case Some(doc) => doc } + Flow + .setup { + case (materializer, _) => + val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName) + Flow[String] + .mapAsync(1)(id => session.flatMap(_.get(id /* timeout? */, target))(materializer.system.dispatcher)) + .collect { case Some(doc) => doc } } .mapMaterializedValue(_ => NotUsed) @@ -48,13 +49,14 @@ object CouchbaseFlow { def upsert(sessionSettings: CouchbaseSessionSettings, writeSettings: CouchbaseWriteSettings, bucketName: String): Flow[JsonDocument, JsonDocument, NotUsed] = - Setup - .flow { materializer => _ => - val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName) - Flow[JsonDocument] - .mapAsync(writeSettings.parallelism)( - doc => session.flatMap(_.upsert(doc, writeSettings))(materializer.system.dispatcher) - ) + Flow + .setup { + case (materializer, _) => + val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName) + Flow[JsonDocument] + .mapAsync(writeSettings.parallelism)( + doc => session.flatMap(_.upsert(doc, writeSettings))(materializer.system.dispatcher) + ) } .mapMaterializedValue(_ => NotUsed) @@ -64,13 +66,14 @@ object CouchbaseFlow { def upsertDoc[T <: Document[_]](sessionSettings: CouchbaseSessionSettings, writeSettings: CouchbaseWriteSettings, bucketName: String): Flow[T, T, NotUsed] = - Setup - .flow { materializer => _ => - val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName) - Flow[T] - .mapAsync(writeSettings.parallelism)( - doc => session.flatMap(_.upsertDoc(doc, writeSettings))(materializer.system.dispatcher) - ) + Flow + .setup { + case (materializer, _) => + val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName) + Flow[T] + .mapAsync(writeSettings.parallelism)( + doc => session.flatMap(_.upsertDoc(doc, writeSettings))(materializer.system.dispatcher) + ) } .mapMaterializedValue(_ => NotUsed) @@ -81,21 +84,22 @@ object CouchbaseFlow { def upsertDocWithResult[T <: Document[_]](sessionSettings: CouchbaseSessionSettings, writeSettings: CouchbaseWriteSettings, bucketName: String): Flow[T, CouchbaseWriteResult[T], NotUsed] = - Setup - .flow { materializer => _ => - val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName) - Flow[T] - .mapAsync(writeSettings.parallelism)( - doc => { - implicit val executor = materializer.system.dispatcher - session - .flatMap(_.upsertDoc(doc, writeSettings)) - .map(_ => CouchbaseWriteSuccess(doc)) - .recover { - case exception => CouchbaseWriteFailure(doc, exception) - } - } - ) + Flow + .setup { + case (materializer, _) => + val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName) + Flow[T] + .mapAsync(writeSettings.parallelism)( + doc => { + implicit val executor = materializer.system.dispatcher + session + .flatMap(_.upsertDoc(doc, writeSettings)) + .map(_ => CouchbaseWriteSuccess(doc)) + .recover { + case exception => CouchbaseWriteFailure(doc, exception) + } + } + ) } .mapMaterializedValue(_ => NotUsed) @@ -105,18 +109,19 @@ object CouchbaseFlow { def delete(sessionSettings: CouchbaseSessionSettings, writeSettings: CouchbaseWriteSettings, bucketName: String): Flow[String, String, NotUsed] = - Setup - .flow { materializer => _ => - val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName) - Flow[String] - .mapAsync(writeSettings.parallelism)( - id => { - implicit val executor = materializer.system.dispatcher - session - .flatMap(_.remove(id, writeSettings)) - .map(_ => id) - } - ) + Flow + .setup { + case (materializer, _) => + val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName) + Flow[String] + .mapAsync(writeSettings.parallelism)( + id => { + implicit val executor = materializer.system.dispatcher + session + .flatMap(_.remove(id, writeSettings)) + .map(_ => id) + } + ) } .mapMaterializedValue(_ => NotUsed) @@ -126,21 +131,22 @@ object CouchbaseFlow { def deleteWithResult(sessionSettings: CouchbaseSessionSettings, writeSettings: CouchbaseWriteSettings, bucketName: String): Flow[String, CouchbaseDeleteResult, NotUsed] = - Setup - .flow { materializer => _ => - val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName) - Flow[String] - .mapAsync(writeSettings.parallelism)( - id => { - implicit val executor = materializer.system.dispatcher - session - .flatMap(_.remove(id, writeSettings)) - .map(_ => CouchbaseDeleteSuccess(id)) - .recover { - case exception => CouchbaseDeleteFailure(id, exception) - } - } - ) + Flow + .setup { + case (materializer, _) => + val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName) + Flow[String] + .mapAsync(writeSettings.parallelism)( + id => { + implicit val executor = materializer.system.dispatcher + session + .flatMap(_.remove(id, writeSettings)) + .map(_ => CouchbaseDeleteSuccess(id)) + .recover { + case exception => CouchbaseDeleteFailure(id, exception) + } + } + ) } .mapMaterializedValue(_ => NotUsed) } diff --git a/couchbase/src/main/scala/akka/stream/alpakka/couchbase/scaladsl/CouchbaseSource.scala b/couchbase/src/main/scala/akka/stream/alpakka/couchbase/scaladsl/CouchbaseSource.scala index 14f585726c..18d339e3c8 100644 --- a/couchbase/src/main/scala/akka/stream/alpakka/couchbase/scaladsl/CouchbaseSource.scala +++ b/couchbase/src/main/scala/akka/stream/alpakka/couchbase/scaladsl/CouchbaseSource.scala @@ -5,7 +5,6 @@ package akka.stream.alpakka.couchbase.scaladsl import akka.NotUsed -import akka.stream.alpakka.couchbase.impl.Setup import akka.stream.alpakka.couchbase.{CouchbaseSessionRegistry, CouchbaseSessionSettings} import akka.stream.scaladsl.Source import com.couchbase.client.java.document.json.JsonObject @@ -22,12 +21,13 @@ object CouchbaseSource { def fromStatement(sessionSettings: CouchbaseSessionSettings, statement: Statement, bucketName: String): Source[JsonObject, NotUsed] = - Setup - .source { materializer => _ => - val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName) - Source - .fromFuture(session.map(_.streamedQuery(statement))(materializer.system.dispatcher)) - .flatMapConcat(identity) + Source + .setup { + case (materializer, _) => + val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName) + Source + .fromFuture(session.map(_.streamedQuery(statement))(materializer.system.dispatcher)) + .flatMapConcat(identity) } .mapMaterializedValue(_ => NotUsed) @@ -37,12 +37,13 @@ object CouchbaseSource { def fromN1qlQuery(sessionSettings: CouchbaseSessionSettings, query: N1qlQuery, bucketName: String): Source[JsonObject, NotUsed] = - Setup - .source { materializer => _ => - val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName) - Source - .fromFuture(session.map(_.streamedQuery(query))(materializer.system.dispatcher)) - .flatMapConcat(identity) + Source + .setup { + case (materializer, _) => + val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName) + Source + .fromFuture(session.map(_.streamedQuery(query))(materializer.system.dispatcher)) + .flatMapConcat(identity) } .mapMaterializedValue(_ => NotUsed) diff --git a/dynamodb/src/main/scala/akka/stream/alpakka/dynamodb/impl/SetupStage.scala b/dynamodb/src/main/scala/akka/stream/alpakka/dynamodb/impl/SetupStage.scala deleted file mode 100644 index 413923adab..0000000000 --- a/dynamodb/src/main/scala/akka/stream/alpakka/dynamodb/impl/SetupStage.scala +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Copyright (C) 2016-2019 Lightbend Inc. - */ - -package akka.stream.alpakka.dynamodb.impl - -import akka.annotation.InternalApi -import akka.stream._ -import akka.stream.scaladsl.{Flow, Keep, Sink, Source} -import akka.stream.stage._ - -import scala.concurrent.{Future, Promise} - -private final class SetupSinkStage[T, M](factory: ActorMaterializer => Attributes => Sink[T, M]) - extends GraphStageWithMaterializedValue[SinkShape[T], Future[M]] { - - private val in = Inlet[T]("SetupSinkStage.in") - override val shape = SinkShape(in) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { - val matPromise = Promise[M] - (createStageLogic(matPromise), matPromise.future) - } - - private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) { - import SetupStage._ - - val subOutlet = new SubSourceOutlet[T]("SetupSinkStage") - subOutlet.setHandler(delegateToInlet(() => pull(in), () => cancel(in))) - setHandler(in, delegateToSubOutlet(() => grab(in), subOutlet)) - - override def preStart(): Unit = { - val sink = factory(actorMaterializer(materializer))(attributes) - - val mat = Source.fromGraph(subOutlet.source).runWith(sink)(subFusingMaterializer) - matPromise.success(mat) - } - } - -} - -private final class SetupFlowStage[T, U, M](factory: ActorMaterializer => Attributes => Flow[T, U, M]) - extends GraphStageWithMaterializedValue[FlowShape[T, U], Future[M]] { - - private val in = Inlet[T]("SetupFlowStage.in") - private val out = Outlet[U]("SetupFlowStage.out") - override val shape = FlowShape(in, out) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { - val matPromise = Promise[M] - (createStageLogic(matPromise), matPromise.future) - } - - private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) { - import SetupStage._ - - val subInlet = new SubSinkInlet[U]("SetupFlowStage") - val subOutlet = new SubSourceOutlet[T]("SetupFlowStage") - - subInlet.setHandler(delegateToOutlet(push(out, _: U), () => complete(out), fail(out, _), subInlet)) - subOutlet.setHandler(delegateToInlet(() => pull(in), () => cancel(in))) - - setHandler(in, delegateToSubOutlet(() => grab(in), subOutlet)) - setHandler(out, delegateToSubInlet(subInlet)) - - override def preStart(): Unit = { - val flow = factory(actorMaterializer(materializer))(attributes) - - val mat = Source - .fromGraph(subOutlet.source) - .viaMat(flow)(Keep.right) - .to(Sink.fromGraph(subInlet.sink)) - .run()(subFusingMaterializer) - matPromise.success(mat) - } - } -} - -private final class SetupSourceStage[T, M](factory: ActorMaterializer => Attributes => Source[T, M]) - extends GraphStageWithMaterializedValue[SourceShape[T], Future[M]] { - - private val out = Outlet[T]("SetupSourceStage.out") - override val shape = SourceShape(out) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { - val matPromise = Promise[M] - (createStageLogic(matPromise), matPromise.future) - } - - private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) { - import SetupStage._ - - val subInlet = new SubSinkInlet[T]("SetupSourceStage") - subInlet.setHandler(delegateToOutlet(push(out, _: T), () => complete(out), fail(out, _), subInlet)) - setHandler(out, delegateToSubInlet(subInlet)) - - override def preStart(): Unit = { - val source = factory(actorMaterializer(materializer))(attributes) - - val mat = source - .to(Sink.fromGraph(subInlet.sink)) - .run()(subFusingMaterializer) - matPromise.success(mat) - } - } -} - -private object SetupStage { - def delegateToSubOutlet[T](grab: () => T, subOutlet: GraphStageLogic#SubSourceOutlet[T]) = new InHandler { - override def onPush(): Unit = - subOutlet.push(grab()) - override def onUpstreamFinish(): Unit = - subOutlet.complete() - override def onUpstreamFailure(ex: Throwable): Unit = - subOutlet.fail(ex) - } - - def delegateToOutlet[T](push: T => Unit, - complete: () => Unit, - fail: Throwable => Unit, - subInlet: GraphStageLogic#SubSinkInlet[T]) = new InHandler { - override def onPush(): Unit = - push(subInlet.grab()) - override def onUpstreamFinish(): Unit = - complete() - override def onUpstreamFailure(ex: Throwable): Unit = - fail(ex) - } - - def delegateToSubInlet[T](subInlet: GraphStageLogic#SubSinkInlet[T]) = new OutHandler { - override def onPull(): Unit = - subInlet.pull() - override def onDownstreamFinish(): Unit = - subInlet.cancel() - } - - def delegateToInlet(pull: () => Unit, cancel: () => Unit) = new OutHandler { - override def onPull(): Unit = - pull() - override def onDownstreamFinish(): Unit = - cancel() - } - - def actorMaterializer(mat: Materializer): ActorMaterializer = mat match { - case am: ActorMaterializer => am - case _ => throw new Error("ActorMaterializer required") - } -} - -@InternalApi private[dynamodb] object Setup { - def sink[T, M](factory: ActorMaterializer => Attributes => Sink[T, M]): Sink[T, Future[M]] = - Sink.fromGraph(new SetupSinkStage(factory)) - - def flow[T, U, M](factory: ActorMaterializer => Attributes => Flow[T, U, M]): Flow[T, U, Future[M]] = - Flow.fromGraph(new SetupFlowStage(factory)) - - def source[T, M](factory: ActorMaterializer => Attributes => Source[T, M]): Source[T, Future[M]] = - Source.fromGraph(new SetupSourceStage(factory)) -} diff --git a/dynamodb/src/main/scala/akka/stream/alpakka/dynamodb/scaladsl/DynamoDb.scala b/dynamodb/src/main/scala/akka/stream/alpakka/dynamodb/scaladsl/DynamoDb.scala index aae310e319..b6b084e695 100644 --- a/dynamodb/src/main/scala/akka/stream/alpakka/dynamodb/scaladsl/DynamoDb.scala +++ b/dynamodb/src/main/scala/akka/stream/alpakka/dynamodb/scaladsl/DynamoDb.scala @@ -6,7 +6,7 @@ package akka.stream.alpakka.dynamodb.scaladsl import akka.NotUsed import akka.stream.{ActorMaterializer, Attributes, Materializer} -import akka.stream.alpakka.dynamodb.impl.{Paginator, Setup} +import akka.stream.alpakka.dynamodb.impl.Paginator import akka.stream.alpakka.dynamodb.{AwsOp, AwsPagedOp, DynamoAttributes, DynamoClientExt} import akka.stream.scaladsl.{Flow, Sink, Source} @@ -21,17 +21,16 @@ object DynamoDb { * Create a Flow that emits a response for every request to DynamoDB. */ def flow[Op <: AwsOp]: Flow[Op, Op#B, NotUsed] = - Setup - .flow(clientFlow[Op]) + Flow.setup(clientFlow[Op]) .mapMaterializedValue(_ => NotUsed) /** * Create a Source that will emit potentially multiple responses for a given request. */ def source(op: AwsPagedOp): Source[op.B, NotUsed] = - Setup - .source { mat => attr => - Paginator.source(clientFlow(mat)(attr), op) + Source + .setup { case (mat, attr) => + Paginator.source(clientFlow(mat, attr), op) } .mapMaterializedValue(_ => NotUsed) @@ -47,7 +46,7 @@ object DynamoDb { def single(op: AwsOp)(implicit mat: Materializer): Future[op.B] = source(op).runWith(Sink.head) - private def clientFlow[Op <: AwsOp](mat: ActorMaterializer)(attr: Attributes) = + private def clientFlow[Op <: AwsOp](mat: ActorMaterializer, attr: Attributes) = attr .get[DynamoAttributes.Client] .map(_.client) diff --git a/google-cloud-pub-sub-grpc/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/grpc/impl/SetupStage.scala b/google-cloud-pub-sub-grpc/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/grpc/impl/SetupStage.scala deleted file mode 100644 index 1b1ffbddf2..0000000000 --- a/google-cloud-pub-sub-grpc/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/grpc/impl/SetupStage.scala +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Copyright (C) 2016-2019 Lightbend Inc. - */ - -package akka.stream.alpakka.googlecloud.pubsub.grpc.impl - -import akka.annotation.InternalApi -import akka.stream._ -import akka.stream.scaladsl.{Flow, Keep, Sink, Source} -import akka.stream.javadsl -import akka.stream.stage._ - -import scala.concurrent.{Future, Promise} - -private final class SetupSinkStage[T, M](factory: ActorMaterializer => Attributes => Sink[T, M]) - extends GraphStageWithMaterializedValue[SinkShape[T], Future[M]] { - - private val in = Inlet[T]("SetupSinkStage.in") - override val shape = SinkShape(in) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { - val matPromise = Promise[M] - (createStageLogic(matPromise), matPromise.future) - } - - private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) { - import SetupStage._ - - val subOutlet = new SubSourceOutlet[T]("SetupSinkStage") - subOutlet.setHandler(delegateToInlet(() => pull(in), () => cancel(in))) - setHandler(in, delegateToSubOutlet(() => grab(in), subOutlet)) - - override def preStart(): Unit = { - val sink = factory(actorMaterializer(materializer))(attributes) - - val mat = Source.fromGraph(subOutlet.source).runWith(sink.withAttributes(attributes))(subFusingMaterializer) - matPromise.success(mat) - } - } - -} - -private final class SetupFlowStage[T, U, M](factory: ActorMaterializer => Attributes => Flow[T, U, M]) - extends GraphStageWithMaterializedValue[FlowShape[T, U], Future[M]] { - - private val in = Inlet[T]("SetupFlowStage.in") - private val out = Outlet[U]("SetupFlowStage.out") - override val shape = FlowShape(in, out) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { - val matPromise = Promise[M] - (createStageLogic(matPromise), matPromise.future) - } - - private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) { - import SetupStage._ - - val subInlet = new SubSinkInlet[U]("SetupFlowStage") - val subOutlet = new SubSourceOutlet[T]("SetupFlowStage") - - subInlet.setHandler(delegateToOutlet(push(out, _: U), () => complete(out), fail(out, _), subInlet)) - subOutlet.setHandler(delegateToInlet(() => pull(in), () => cancel(in))) - - setHandler(in, delegateToSubOutlet(() => grab(in), subOutlet)) - setHandler(out, delegateToSubInlet(subInlet)) - - override def preStart(): Unit = { - val flow = factory(actorMaterializer(materializer))(attributes) - - val mat = Source - .fromGraph(subOutlet.source) - .viaMat(flow.withAttributes(attributes))(Keep.right) - .to(Sink.fromGraph(subInlet.sink)) - .run()(subFusingMaterializer) - matPromise.success(mat) - } - } -} - -private final class SetupSourceStage[T, M](factory: ActorMaterializer => Attributes => Source[T, M]) - extends GraphStageWithMaterializedValue[SourceShape[T], Future[M]] { - - private val out = Outlet[T]("SetupSourceStage.out") - override val shape = SourceShape(out) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { - val matPromise = Promise[M] - (createStageLogic(matPromise), matPromise.future) - } - - private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) { - import SetupStage._ - - val subInlet = new SubSinkInlet[T]("SetupSourceStage") - subInlet.setHandler(delegateToOutlet(push(out, _: T), () => complete(out), fail(out, _), subInlet)) - setHandler(out, delegateToSubInlet(subInlet)) - - override def preStart(): Unit = { - val source = factory(actorMaterializer(materializer))(attributes) - - val mat = source - .withAttributes(attributes) - .to(Sink.fromGraph(subInlet.sink)) - .run()(subFusingMaterializer) - matPromise.success(mat) - } - } -} - -private object SetupStage { - def delegateToSubOutlet[T](grab: () => T, subOutlet: GraphStageLogic#SubSourceOutlet[T]) = new InHandler { - override def onPush(): Unit = - subOutlet.push(grab()) - override def onUpstreamFinish(): Unit = - subOutlet.complete() - override def onUpstreamFailure(ex: Throwable): Unit = - subOutlet.fail(ex) - } - - def delegateToOutlet[T](push: T => Unit, - complete: () => Unit, - fail: Throwable => Unit, - subInlet: GraphStageLogic#SubSinkInlet[T]) = new InHandler { - override def onPush(): Unit = - push(subInlet.grab()) - override def onUpstreamFinish(): Unit = - complete() - override def onUpstreamFailure(ex: Throwable): Unit = - fail(ex) - } - - def delegateToSubInlet[T](subInlet: GraphStageLogic#SubSinkInlet[T]) = new OutHandler { - override def onPull(): Unit = - subInlet.pull() - override def onDownstreamFinish(): Unit = - subInlet.cancel() - } - - def delegateToInlet(pull: () => Unit, cancel: () => Unit) = new OutHandler { - override def onPull(): Unit = - pull() - override def onDownstreamFinish(): Unit = - cancel() - } - - def actorMaterializer(mat: Materializer): ActorMaterializer = mat match { - case am: ActorMaterializer => am - case _ => throw new Error("ActorMaterializer required") - } -} - -@InternalApi private[grpc] object Setup { - def sink[T, M](factory: ActorMaterializer => Attributes => Sink[T, M]): Sink[T, Future[M]] = - Sink.fromGraph(new SetupSinkStage(factory)) - - def createSink[T, M](factory: ActorMaterializer => Attributes => javadsl.Sink[T, M]): javadsl.Sink[T, Future[M]] = - Sink.fromGraph(new SetupSinkStage(mat => attr => factory(mat)(attr).asScala)).asJava - - def flow[T, U, M](factory: ActorMaterializer => Attributes => Flow[T, U, M]): Flow[T, U, Future[M]] = - Flow.fromGraph(new SetupFlowStage(factory)) - - def createFlow[T, U, M]( - factory: ActorMaterializer => Attributes => javadsl.Flow[T, U, M] - ): javadsl.Flow[T, U, Future[M]] = - Flow.fromGraph(new SetupFlowStage(mat => attr => factory(mat)(attr).asScala)).asJava - - def source[T, M](factory: ActorMaterializer => Attributes => Source[T, M]): Source[T, Future[M]] = - Source.fromGraph(new SetupSourceStage(factory)) - - def createSource[T, M]( - factory: ActorMaterializer => Attributes => javadsl.Source[T, M] - ): javadsl.Source[T, Future[M]] = - Source.fromGraph(new SetupSourceStage(mat => attr => factory(mat)(attr).asScala)).asJava -} diff --git a/google-cloud-pub-sub-grpc/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/grpc/javadsl/GooglePubSub.scala b/google-cloud-pub-sub-grpc/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/grpc/javadsl/GooglePubSub.scala index 059e5d37ee..aeb712ecb7 100644 --- a/google-cloud-pub-sub-grpc/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/grpc/javadsl/GooglePubSub.scala +++ b/google-cloud-pub-sub-grpc/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/grpc/javadsl/GooglePubSub.scala @@ -8,16 +8,11 @@ import java.time.Duration import java.util.concurrent.{CompletableFuture, CompletionStage} import akka.actor.Cancellable -import akka.dispatch.ExecutionContexts -import akka.stream.alpakka.googlecloud.pubsub.grpc.impl.Setup import akka.stream.{ActorMaterializer, Attributes} import akka.stream.javadsl.{Flow, Keep, Sink, Source} import akka.{Done, NotUsed} import com.google.pubsub.v1._ -import scala.compat.java8.FutureConverters._ -import scala.concurrent.Future - /** * Google Pub/Sub Akka Stream operator factory. */ @@ -30,13 +25,14 @@ object GooglePubSub { * @param parallelism controls how many messages can be in-flight at any given time */ def publish(parallelism: Int): Flow[PublishRequest, PublishResponse, NotUsed] = - Setup - .createFlow { implicit mat => implicit attr => - Flow - .create[PublishRequest] - .mapAsyncUnordered(parallelism, javaFunction(publisher().client.publish)) + Flow + .setup { + case (mat, attr) => + Flow + .create[PublishRequest] + .mapAsyncUnordered(parallelism, japiFunction(publisher(mat, attr).client.publish)) } - .mapMaterializedValue(javaFunction(_ => NotUsed)) + .mapMaterializedValue(japiFunction(_ => NotUsed)) /** * Create a source that emits messages for a given subscription. @@ -48,30 +44,31 @@ object GooglePubSub { */ def subscribe(request: StreamingPullRequest, pollInterval: Duration): Source[ReceivedMessage, CompletableFuture[Cancellable]] = - Setup - .createSource { implicit mat => implicit attr => - val cancellable = new CompletableFuture[Cancellable]() + Source + .setup { + case (mat, attr) => + val cancellable = new CompletableFuture[Cancellable]() - val subsequentRequest = request.toBuilder - .setSubscription("") - .setStreamAckDeadlineSeconds(0) - .build() + val subsequentRequest = request.toBuilder + .setSubscription("") + .setStreamAckDeadlineSeconds(0) + .build() - subscriber().client - .streamingPull( - Source - .single(request) - .concat( - Source - .tick(pollInterval, pollInterval, subsequentRequest) - .mapMaterializedValue(javaFunction(cancellable.complete)) - ) - ) - .mapConcat(javaFunction(_.getReceivedMessagesList)) - .mapMaterializedValue(javaFunction(_ => cancellable)) + subscriber(mat, attr).client + .streamingPull( + Source + .single(request) + .concat( + Source + .tick(pollInterval, pollInterval, subsequentRequest) + .mapMaterializedValue(japiFunction(cancellable.complete)) + ) + ) + .mapConcat(japiFunction(_.getReceivedMessagesList)) + .mapMaterializedValue(japiFunction(_ => cancellable)) } - .mapMaterializedValue(javaFunction(flattenFutureCs)) - .mapMaterializedValue(javaFunction(_.toCompletableFuture)) + .mapMaterializedValue(japiFunction(flattenCs)) + .mapMaterializedValue(japiFunction(_.toCompletableFuture)) /** * Create a sink that accepts consumed message acknowledgements. @@ -81,36 +78,37 @@ object GooglePubSub { * @param parallelism controls how many acknowledgements can be in-flight at any given time */ def acknowledge(parallelism: Int): Sink[AcknowledgeRequest, CompletionStage[Done]] = - Setup - .createSink { implicit mat => implicit attr => - Flow - .create[AcknowledgeRequest] - .mapAsyncUnordered(parallelism, javaFunction(subscriber().client.acknowledge)) - .toMat(Sink.ignore(), Keep.right[NotUsed, CompletionStage[Done]]) + Sink + .setup { + case (mat, attr) => + Flow + .create[AcknowledgeRequest] + .mapAsyncUnordered(parallelism, japiFunction(subscriber(mat, attr).client.acknowledge)) + .toMat(Sink.ignore(), Keep.right[NotUsed, CompletionStage[Done]]) } - .mapMaterializedValue(javaFunction(flattenFutureCs)) + .mapMaterializedValue(japiFunction(flattenCs)) /** * Helper for creating akka.japi.function.Function instances from Scala * functions as Scala 2.11 does not know about SAMs. */ - private def javaFunction[A, B](f: A => B): akka.japi.function.Function[A, B] = + private def japiFunction[A, B](f: A => B): akka.japi.function.Function[A, B] = new akka.japi.function.Function[A, B]() { override def apply(a: A): B = f(a) } - private def flattenFutureCs[T](f: Future[CompletionStage[T]]): CompletionStage[T] = - f.map(_.toScala)(ExecutionContexts.sameThreadExecutionContext) - .flatMap(identity)(ExecutionContexts.sameThreadExecutionContext) - .toJava + private def flattenCs[T](f: CompletionStage[_ <: CompletionStage[T]]): CompletionStage[T] = + f.thenCompose(new java.util.function.Function[CompletionStage[T], CompletionStage[T]] { + override def apply(t: CompletionStage[T]): CompletionStage[T] = t + }) - private def publisher()(implicit mat: ActorMaterializer, attr: Attributes) = + private def publisher(mat: ActorMaterializer, attr: Attributes) = attr .get[PubSubAttributes.Publisher] .map(_.publisher) .getOrElse(GrpcPublisherExt()(mat.system).publisher) - private def subscriber()(implicit mat: ActorMaterializer, attr: Attributes) = + private def subscriber(mat: ActorMaterializer, attr: Attributes) = attr .get[PubSubAttributes.Subscriber] .map(_.subscriber) diff --git a/google-cloud-pub-sub-grpc/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/grpc/scaladsl/GooglePubSub.scala b/google-cloud-pub-sub-grpc/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/grpc/scaladsl/GooglePubSub.scala index bf8abf9c84..41709e4c0e 100644 --- a/google-cloud-pub-sub-grpc/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/grpc/scaladsl/GooglePubSub.scala +++ b/google-cloud-pub-sub-grpc/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/grpc/scaladsl/GooglePubSub.scala @@ -7,7 +7,6 @@ package akka.stream.alpakka.googlecloud.pubsub.grpc.scaladsl import akka.actor.Cancellable import akka.dispatch.ExecutionContexts import akka.stream.{ActorMaterializer, Attributes} -import akka.stream.alpakka.googlecloud.pubsub.grpc.impl.Setup import akka.stream.scaladsl.{Flow, Keep, Sink, Source} import akka.{Done, NotUsed} import com.google.pubsub.v1.pubsub._ @@ -27,10 +26,11 @@ object GooglePubSub { * @param parallelism controls how many messages can be in-flight at any given time */ def publish(parallelism: Int): Flow[PublishRequest, PublishResponse, NotUsed] = - Setup - .flow { implicit mat => implicit attr => - Flow[PublishRequest] - .mapAsyncUnordered(parallelism)(publisher().client.publish) + Flow + .setup { + case (mat, attr) => + Flow[PublishRequest] + .mapAsyncUnordered(parallelism)(publisher(mat, attr).client.publish) } .mapMaterializedValue(_ => NotUsed) @@ -46,27 +46,28 @@ object GooglePubSub { request: StreamingPullRequest, pollInterval: FiniteDuration ): Source[ReceivedMessage, Future[Cancellable]] = - Setup - .source { implicit mat => implicit attr => - val cancellable = Promise[Cancellable] + Source + .setup { + case (mat, attr) => + val cancellable = Promise[Cancellable] - val subsequentRequest = request - .withSubscription("") - .withStreamAckDeadlineSeconds(0) + val subsequentRequest = request + .withSubscription("") + .withStreamAckDeadlineSeconds(0) - subscriber().client - .streamingPull( - Source - .single(request) - .concat( - Source - .tick(0.seconds, pollInterval, ()) - .map(_ => subsequentRequest) - .mapMaterializedValue(cancellable.success) - ) - ) - .mapConcat(_.receivedMessages.toVector) - .mapMaterializedValue(_ => cancellable.future) + subscriber(mat, attr).client + .streamingPull( + Source + .single(request) + .concat( + Source + .tick(0.seconds, pollInterval, ()) + .map(_ => subsequentRequest) + .mapMaterializedValue(cancellable.success) + ) + ) + .mapConcat(_.receivedMessages.toVector) + .mapMaterializedValue(_ => cancellable.future) } .mapMaterializedValue(_.flatMap(identity)(ExecutionContexts.sameThreadExecutionContext)) @@ -78,21 +79,22 @@ object GooglePubSub { * @param parallelism controls how many acknowledgements can be in-flight at any given time */ def acknowledge(parallelism: Int): Sink[AcknowledgeRequest, Future[Done]] = - Setup - .sink { implicit mat => implicit attr => - Flow[AcknowledgeRequest] - .mapAsyncUnordered(parallelism)(subscriber().client.acknowledge) - .toMat(Sink.ignore)(Keep.right) + Sink + .setup { + case (mat, attr) => + Flow[AcknowledgeRequest] + .mapAsyncUnordered(parallelism)(subscriber(mat, attr).client.acknowledge) + .toMat(Sink.ignore)(Keep.right) } .mapMaterializedValue(_.flatMap(identity)(ExecutionContexts.sameThreadExecutionContext)) - private def publisher()(implicit mat: ActorMaterializer, attr: Attributes) = + private def publisher(mat: ActorMaterializer, attr: Attributes) = attr .get[PubSubAttributes.Publisher] .map(_.publisher) .getOrElse(GrpcPublisherExt()(mat.system).publisher) - private def subscriber()(implicit mat: ActorMaterializer, attr: Attributes) = + private def subscriber(mat: ActorMaterializer, attr: Attributes) = attr .get[PubSubAttributes.Subscriber] .map(_.subscriber) diff --git a/google-fcm/src/main/scala/akka/stream/alpakka/google/firebase/fcm/impl/FcmFlows.scala b/google-fcm/src/main/scala/akka/stream/alpakka/google/firebase/fcm/impl/FcmFlows.scala index 0f6816ae02..4ed0d36990 100644 --- a/google-fcm/src/main/scala/akka/stream/alpakka/google/firebase/fcm/impl/FcmFlows.scala +++ b/google-fcm/src/main/scala/akka/stream/alpakka/google/firebase/fcm/impl/FcmFlows.scala @@ -19,35 +19,39 @@ private[fcm] object FcmFlows { private[fcm] def fcmWithData[T](conf: FcmSettings, sender: FcmSender): Flow[(FcmNotification, T), (FcmResponse, T), NotUsed] = - Setup - .flow { implicit materializer => _ => - import materializer.executionContext - val http = Http()(materializer.system) - val session: GoogleSession = new GoogleSession(conf.clientEmail, conf.privateKey, new GoogleTokenApi(http)) - Flow[(FcmNotification, T)] - .mapAsync(conf.maxConcurrentConnections)( - in => - session.getToken().flatMap { token => - sender.send(conf.projectId, token, http, FcmSend(conf.isTest, in._1)).zip(Future.successful(in._2)) - } - ) + Flow + .setup { + case (materializer, _) => + import materializer.executionContext + val http = Http()(materializer.system) + val session: GoogleSession = new GoogleSession(conf.clientEmail, conf.privateKey, new GoogleTokenApi(http)) + Flow[(FcmNotification, T)] + .mapAsync(conf.maxConcurrentConnections)( + in => + session.getToken()(materializer).flatMap { token => + sender + .send(conf.projectId, token, http, FcmSend(conf.isTest, in._1))(materializer) + .zip(Future.successful(in._2)) + } + ) } .mapMaterializedValue(_ => NotUsed) private[fcm] def fcm(conf: FcmSettings, sender: FcmSender): Flow[FcmNotification, FcmResponse, NotUsed] = - Setup - .flow { implicit materializer => _ => - import materializer.executionContext - val http = Http()(materializer.system) - val session: GoogleSession = new GoogleSession(conf.clientEmail, conf.privateKey, new GoogleTokenApi(http)) - val sender: FcmSender = new FcmSender() - Flow[FcmNotification] - .mapAsync(conf.maxConcurrentConnections)( - in => - session.getToken().flatMap { token => - sender.send(conf.projectId, token, http, FcmSend(conf.isTest, in)) - } - ) + Flow + .setup { + case (materializer, _) => + import materializer.executionContext + val http = Http()(materializer.system) + val session: GoogleSession = new GoogleSession(conf.clientEmail, conf.privateKey, new GoogleTokenApi(http)) + val sender: FcmSender = new FcmSender() + Flow[FcmNotification] + .mapAsync(conf.maxConcurrentConnections)( + in => + session.getToken()(materializer).flatMap { token => + sender.send(conf.projectId, token, http, FcmSend(conf.isTest, in))(materializer) + } + ) } .mapMaterializedValue(_ => NotUsed) } diff --git a/google-fcm/src/main/scala/akka/stream/alpakka/google/firebase/fcm/impl/SetupStage.scala b/google-fcm/src/main/scala/akka/stream/alpakka/google/firebase/fcm/impl/SetupStage.scala deleted file mode 100644 index dc5279ae72..0000000000 --- a/google-fcm/src/main/scala/akka/stream/alpakka/google/firebase/fcm/impl/SetupStage.scala +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Copyright (C) 2016-2019 Lightbend Inc. - */ - -package akka.stream.alpakka.google.firebase.fcm.impl - -import akka.stream._ -import akka.stream.scaladsl.{Flow, Keep, Sink, Source} -import akka.stream.stage._ - -import scala.concurrent.{Future, Promise} - -private final class SetupFlowStage[T, U, M](factory: ActorMaterializer => Attributes => Flow[T, U, M]) - extends GraphStageWithMaterializedValue[FlowShape[T, U], Future[M]] { - - private val in = Inlet[T]("SetupFlowStage.in") - private val out = Outlet[U]("SetupFlowStage.out") - override val shape = FlowShape(in, out) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { - val matPromise = Promise[M] - (createStageLogic(matPromise), matPromise.future) - } - - private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) { - import SetupStage._ - - val subInlet = new SubSinkInlet[U]("SetupFlowStage") - val subOutlet = new SubSourceOutlet[T]("SetupFlowStage") - - subInlet.setHandler(delegateToOutlet(push(out, _: U), () => complete(out), fail(out, _), subInlet)) - subOutlet.setHandler(delegateToInlet(() => pull(in), () => cancel(in))) - - setHandler(in, delegateToSubOutlet(() => grab(in), subOutlet)) - setHandler(out, delegateToSubInlet(subInlet)) - - override def preStart(): Unit = { - val flow = factory(actorMaterializer(materializer))(attributes) - - val mat = Source - .fromGraph(subOutlet.source) - .viaMat(flow.withAttributes(attributes))(Keep.right) - .to(Sink.fromGraph(subInlet.sink)) - .run()(subFusingMaterializer) - matPromise.success(mat) - } - } -} - -private object SetupStage { - def delegateToSubOutlet[T](grab: () => T, subOutlet: GraphStageLogic#SubSourceOutlet[T]) = new InHandler { - override def onPush(): Unit = - subOutlet.push(grab()) - override def onUpstreamFinish(): Unit = - subOutlet.complete() - override def onUpstreamFailure(ex: Throwable): Unit = - subOutlet.fail(ex) - } - - def delegateToOutlet[T](push: T => Unit, - complete: () => Unit, - fail: Throwable => Unit, - subInlet: GraphStageLogic#SubSinkInlet[T]) = new InHandler { - override def onPush(): Unit = - push(subInlet.grab()) - override def onUpstreamFinish(): Unit = - complete() - override def onUpstreamFailure(ex: Throwable): Unit = - fail(ex) - } - - def delegateToSubInlet[T](subInlet: GraphStageLogic#SubSinkInlet[T]) = new OutHandler { - override def onPull(): Unit = - subInlet.pull() - override def onDownstreamFinish(): Unit = - subInlet.cancel() - } - - def delegateToInlet(pull: () => Unit, cancel: () => Unit) = new OutHandler { - override def onPull(): Unit = - pull() - override def onDownstreamFinish(): Unit = - cancel() - } - - def actorMaterializer(mat: Materializer): ActorMaterializer = mat match { - case am: ActorMaterializer => am - case _ => throw new Error("ActorMaterializer required") - } -} - -private object Setup { - - def flow[T, U, M](factory: ActorMaterializer => Attributes => Flow[T, U, M]): Flow[T, U, Future[M]] = - Flow.fromGraph(new SetupFlowStage(factory)) - -} diff --git a/google-fcm/src/test/scala/akka/stream/alpakka/google/firebase/fcm/impl/FcmSenderSpec.scala b/google-fcm/src/test/scala/akka/stream/alpakka/google/firebase/fcm/impl/FcmSenderSpec.scala index 50b8a1305a..6474366a12 100644 --- a/google-fcm/src/test/scala/akka/stream/alpakka/google/firebase/fcm/impl/FcmSenderSpec.scala +++ b/google-fcm/src/test/scala/akka/stream/alpakka/google/firebase/fcm/impl/FcmSenderSpec.scala @@ -18,7 +18,7 @@ import org.mockito.ArgumentCaptor import org.mockito.ArgumentMatchers.any import org.mockito.Mockito.{verify, when} import org.scalatest.concurrent.ScalaFutures -import org.scalatest.mockito.MockitoSugar +import org.scalatestplus.mockito.MockitoSugar import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} import scala.concurrent.duration._ diff --git a/google-fcm/src/test/scala/akka/stream/alpakka/google/firebase/fcm/impl/GoogleTokenApiSpec.scala b/google-fcm/src/test/scala/akka/stream/alpakka/google/firebase/fcm/impl/GoogleTokenApiSpec.scala index b9cf18e78a..b3ee16735f 100644 --- a/google-fcm/src/test/scala/akka/stream/alpakka/google/firebase/fcm/impl/GoogleTokenApiSpec.scala +++ b/google-fcm/src/test/scala/akka/stream/alpakka/google/firebase/fcm/impl/GoogleTokenApiSpec.scala @@ -17,7 +17,7 @@ import org.mockito.ArgumentCaptor import org.mockito.ArgumentMatchers.any import org.mockito.Mockito.{verify, when} import org.scalatest.concurrent.ScalaFutures -import org.scalatest.mockito.MockitoSugar +import org.scalatestplus.mockito.MockitoSugar import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} import pdi.jwt.{Jwt, JwtAlgorithm} diff --git a/kudu/src/main/scala/akka/stream/alpakka/kudu/impl/SetupStage.scala b/kudu/src/main/scala/akka/stream/alpakka/kudu/impl/SetupStage.scala deleted file mode 100644 index f0e8a2e9a9..0000000000 --- a/kudu/src/main/scala/akka/stream/alpakka/kudu/impl/SetupStage.scala +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Copyright (C) 2016-2019 Lightbend Inc. - */ - -package akka.stream.alpakka.kudu.impl - -import akka.annotation.InternalApi -import akka.stream._ -import akka.stream.scaladsl.{Flow, Keep, Sink, Source} -import akka.stream.stage._ - -import scala.concurrent.{Future, Promise} - -private final class SetupSinkStage[T, M](factory: ActorMaterializer => Attributes => Sink[T, M]) - extends GraphStageWithMaterializedValue[SinkShape[T], Future[M]] { - - private val in = Inlet[T]("SetupSinkStage.in") - override val shape = SinkShape(in) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { - val matPromise = Promise[M] - (createStageLogic(matPromise), matPromise.future) - } - - private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) { - import SetupStage._ - - val subOutlet = new SubSourceOutlet[T]("SetupSinkStage") - subOutlet.setHandler(delegateToInlet(() => pull(in), () => cancel(in))) - setHandler(in, delegateToSubOutlet(() => grab(in), subOutlet)) - - override def preStart(): Unit = { - val sink = factory(actorMaterializer(materializer))(attributes) - - val mat = Source.fromGraph(subOutlet.source).runWith(sink.withAttributes(attributes))(subFusingMaterializer) - matPromise.success(mat) - } - } - -} - -private final class SetupFlowStage[T, U, M](factory: ActorMaterializer => Attributes => Flow[T, U, M]) - extends GraphStageWithMaterializedValue[FlowShape[T, U], Future[M]] { - - private val in = Inlet[T]("SetupFlowStage.in") - private val out = Outlet[U]("SetupFlowStage.out") - override val shape = FlowShape(in, out) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { - val matPromise = Promise[M] - (createStageLogic(matPromise), matPromise.future) - } - - private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) { - import SetupStage._ - - val subInlet = new SubSinkInlet[U]("SetupFlowStage") - val subOutlet = new SubSourceOutlet[T]("SetupFlowStage") - - subInlet.setHandler(delegateToOutlet(push(out, _: U), () => complete(out), fail(out, _), subInlet)) - subOutlet.setHandler(delegateToInlet(() => pull(in), () => cancel(in))) - - setHandler(in, delegateToSubOutlet(() => grab(in), subOutlet)) - setHandler(out, delegateToSubInlet(subInlet)) - - override def preStart(): Unit = { - val flow = factory(actorMaterializer(materializer))(attributes) - - val mat = Source - .fromGraph(subOutlet.source) - .viaMat(flow.withAttributes(attributes))(Keep.right) - .to(Sink.fromGraph(subInlet.sink)) - .run()(subFusingMaterializer) - matPromise.success(mat) - } - } -} - -private final class SetupSourceStage[T, M](factory: ActorMaterializer => Attributes => Source[T, M]) - extends GraphStageWithMaterializedValue[SourceShape[T], Future[M]] { - - private val out = Outlet[T]("SetupSourceStage.out") - override val shape = SourceShape(out) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { - val matPromise = Promise[M] - (createStageLogic(matPromise), matPromise.future) - } - - private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) { - import SetupStage._ - - val subInlet = new SubSinkInlet[T]("SetupSourceStage") - subInlet.setHandler(delegateToOutlet(push(out, _: T), () => complete(out), fail(out, _), subInlet)) - setHandler(out, delegateToSubInlet(subInlet)) - - override def preStart(): Unit = { - val source = factory(actorMaterializer(materializer))(attributes) - - val mat = source - .withAttributes(attributes) - .to(Sink.fromGraph(subInlet.sink)) - .run()(subFusingMaterializer) - matPromise.success(mat) - } - } -} - -private object SetupStage { - def delegateToSubOutlet[T](grab: () => T, subOutlet: GraphStageLogic#SubSourceOutlet[T]) = new InHandler { - override def onPush(): Unit = - subOutlet.push(grab()) - override def onUpstreamFinish(): Unit = - subOutlet.complete() - override def onUpstreamFailure(ex: Throwable): Unit = - subOutlet.fail(ex) - } - - def delegateToOutlet[T](push: T => Unit, - complete: () => Unit, - fail: Throwable => Unit, - subInlet: GraphStageLogic#SubSinkInlet[T]) = new InHandler { - override def onPush(): Unit = - push(subInlet.grab()) - override def onUpstreamFinish(): Unit = - complete() - override def onUpstreamFailure(ex: Throwable): Unit = - fail(ex) - } - - def delegateToSubInlet[T](subInlet: GraphStageLogic#SubSinkInlet[T]) = new OutHandler { - override def onPull(): Unit = - subInlet.pull() - override def onDownstreamFinish(): Unit = - subInlet.cancel() - } - - def delegateToInlet(pull: () => Unit, cancel: () => Unit) = new OutHandler { - override def onPull(): Unit = - pull() - override def onDownstreamFinish(): Unit = - cancel() - } - - def actorMaterializer(mat: Materializer): ActorMaterializer = mat match { - case am: ActorMaterializer => am - case _ => throw new Error("ActorMaterializer required") - } -} - -@InternalApi private[kudu] object Setup { - def sink[T, M](factory: ActorMaterializer => Attributes => Sink[T, M]): Sink[T, Future[M]] = - Sink.fromGraph(new SetupSinkStage(factory)) - - def flow[T, U, M](factory: ActorMaterializer => Attributes => Flow[T, U, M]): Flow[T, U, Future[M]] = - Flow.fromGraph(new SetupFlowStage(factory)) - - def source[T, M](factory: ActorMaterializer => Attributes => Source[T, M]): Source[T, Future[M]] = - Source.fromGraph(new SetupSourceStage(factory)) -} diff --git a/kudu/src/main/scala/akka/stream/alpakka/kudu/scaladsl/KuduTable.scala b/kudu/src/main/scala/akka/stream/alpakka/kudu/scaladsl/KuduTable.scala index 1bddaf7e99..2b1d2c33c9 100644 --- a/kudu/src/main/scala/akka/stream/alpakka/kudu/scaladsl/KuduTable.scala +++ b/kudu/src/main/scala/akka/stream/alpakka/kudu/scaladsl/KuduTable.scala @@ -6,7 +6,7 @@ package akka.stream.alpakka.kudu.scaladsl import akka.stream.{ActorMaterializer, Attributes} import akka.stream.alpakka.kudu.{KuduAttributes, KuduClientExt, KuduTableSettings} -import akka.stream.alpakka.kudu.impl.{KuduFlowStage, Setup} +import akka.stream.alpakka.kudu.impl.KuduFlowStage import akka.stream.scaladsl.{Flow, Keep, Sink} import akka.{Done, NotUsed} @@ -27,13 +27,14 @@ object KuduTable { * Create a Flow writing elements to a Kudu table. */ def flow[A](settings: KuduTableSettings[A]): Flow[A, A, NotUsed] = - Setup - .flow { implicit mat => implicit attr => - Flow.fromGraph(new KuduFlowStage[A](settings, client())) + Flow + .setup { + case (mat, attr) => + Flow.fromGraph(new KuduFlowStage[A](settings, client(mat, attr))) } .mapMaterializedValue(_ => NotUsed) - private def client()(implicit mat: ActorMaterializer, attr: Attributes) = + private def client(mat: ActorMaterializer, attr: Attributes) = attr .get[KuduAttributes.Client] .map(_.client) diff --git a/reference/src/main/scala/akka/stream/alpakka/reference/impl/SetupStage.scala b/reference/src/main/scala/akka/stream/alpakka/reference/impl/SetupStage.scala deleted file mode 100644 index 214243b34e..0000000000 --- a/reference/src/main/scala/akka/stream/alpakka/reference/impl/SetupStage.scala +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Copyright (C) 2016-2019 Lightbend Inc. - */ - -package akka.stream.alpakka.reference.impl - -import akka.annotation.InternalApi -import akka.stream._ -import akka.stream.scaladsl.{Flow, Keep, Sink, Source} -import akka.stream.stage._ - -import scala.concurrent.{Future, Promise} - -// This will be removed from Alpakka project once it is merged to Akka. -// https://github.com/akka/akka/pull/26477 - -private final class SetupSinkStage[T, M](factory: ActorMaterializer => Attributes => Sink[T, M]) - extends GraphStageWithMaterializedValue[SinkShape[T], Future[M]] { - - private val in = Inlet[T]("SetupSinkStage.in") - override val shape = SinkShape(in) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { - val matPromise = Promise[M] - (createStageLogic(matPromise), matPromise.future) - } - - private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) { - import SetupStage._ - - val subOutlet = new SubSourceOutlet[T]("SetupSinkStage") - subOutlet.setHandler(delegateToInlet(() => pull(in), () => cancel(in))) - setHandler(in, delegateToSubOutlet(() => grab(in), subOutlet)) - - override def preStart(): Unit = { - val sink = factory(actorMaterializer(materializer))(attributes) - - val mat = Source.fromGraph(subOutlet.source).runWith(sink.withAttributes(attributes))(subFusingMaterializer) - matPromise.success(mat) - } - } - -} - -private final class SetupFlowStage[T, U, M](factory: ActorMaterializer => Attributes => Flow[T, U, M]) - extends GraphStageWithMaterializedValue[FlowShape[T, U], Future[M]] { - - private val in = Inlet[T]("SetupFlowStage.in") - private val out = Outlet[U]("SetupFlowStage.out") - override val shape = FlowShape(in, out) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { - val matPromise = Promise[M] - (createStageLogic(matPromise), matPromise.future) - } - - private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) { - import SetupStage._ - - val subInlet = new SubSinkInlet[U]("SetupFlowStage") - val subOutlet = new SubSourceOutlet[T]("SetupFlowStage") - - subInlet.setHandler(delegateToOutlet(push(out, _: U), () => complete(out), fail(out, _), subInlet)) - subOutlet.setHandler(delegateToInlet(() => pull(in), () => cancel(in))) - - setHandler(in, delegateToSubOutlet(() => grab(in), subOutlet)) - setHandler(out, delegateToSubInlet(subInlet)) - - override def preStart(): Unit = { - val flow = factory(actorMaterializer(materializer))(attributes) - - val mat = Source - .fromGraph(subOutlet.source) - .viaMat(flow.withAttributes(attributes))(Keep.right) - .to(Sink.fromGraph(subInlet.sink)) - .run()(subFusingMaterializer) - matPromise.success(mat) - } - } -} - -private final class SetupSourceStage[T, M](factory: ActorMaterializer => Attributes => Source[T, M]) - extends GraphStageWithMaterializedValue[SourceShape[T], Future[M]] { - - private val out = Outlet[T]("SetupSourceStage.out") - override val shape = SourceShape(out) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { - val matPromise = Promise[M] - (createStageLogic(matPromise), matPromise.future) - } - - private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) { - import SetupStage._ - - val subInlet = new SubSinkInlet[T]("SetupSourceStage") - subInlet.setHandler(delegateToOutlet(push(out, _: T), () => complete(out), fail(out, _), subInlet)) - setHandler(out, delegateToSubInlet(subInlet)) - - override def preStart(): Unit = { - val source = factory(actorMaterializer(materializer))(attributes) - - val mat = source - .withAttributes(attributes) - .to(Sink.fromGraph(subInlet.sink)) - .run()(subFusingMaterializer) - matPromise.success(mat) - } - } -} - -private object SetupStage { - def delegateToSubOutlet[T](grab: () => T, subOutlet: GraphStageLogic#SubSourceOutlet[T]) = new InHandler { - override def onPush(): Unit = - subOutlet.push(grab()) - override def onUpstreamFinish(): Unit = - subOutlet.complete() - override def onUpstreamFailure(ex: Throwable): Unit = - subOutlet.fail(ex) - } - - def delegateToOutlet[T](push: T => Unit, - complete: () => Unit, - fail: Throwable => Unit, - subInlet: GraphStageLogic#SubSinkInlet[T]) = new InHandler { - override def onPush(): Unit = - push(subInlet.grab()) - override def onUpstreamFinish(): Unit = - complete() - override def onUpstreamFailure(ex: Throwable): Unit = - fail(ex) - } - - def delegateToSubInlet[T](subInlet: GraphStageLogic#SubSinkInlet[T]) = new OutHandler { - override def onPull(): Unit = - subInlet.pull() - override def onDownstreamFinish(): Unit = - subInlet.cancel() - } - - def delegateToInlet(pull: () => Unit, cancel: () => Unit) = new OutHandler { - override def onPull(): Unit = - pull() - override def onDownstreamFinish(): Unit = - cancel() - } - - def actorMaterializer(mat: Materializer): ActorMaterializer = mat match { - case am: ActorMaterializer => am - case _ => throw new Error("ActorMaterializer required") - } -} - -@InternalApi private[reference] object Setup { - def sink[T, M](factory: ActorMaterializer => Attributes => Sink[T, M]): Sink[T, Future[M]] = - Sink.fromGraph(new SetupSinkStage(factory)) - - def flow[T, U, M](factory: ActorMaterializer => Attributes => Flow[T, U, M]): Flow[T, U, Future[M]] = - Flow.fromGraph(new SetupFlowStage(factory)) - - def source[T, M](factory: ActorMaterializer => Attributes => Source[T, M]): Source[T, Future[M]] = - Source.fromGraph(new SetupSourceStage(factory)) -} diff --git a/reference/src/main/scala/akka/stream/alpakka/reference/scaladsl/Reference.scala b/reference/src/main/scala/akka/stream/alpakka/reference/scaladsl/Reference.scala index 1bda8f0c64..6e12d4e4bb 100644 --- a/reference/src/main/scala/akka/stream/alpakka/reference/scaladsl/Reference.scala +++ b/reference/src/main/scala/akka/stream/alpakka/reference/scaladsl/Reference.scala @@ -7,7 +7,7 @@ package akka.stream.alpakka.reference.scaladsl import akka.actor.ActorSystem import akka.stream.Attributes import akka.{Done, NotUsed} -import akka.stream.alpakka.reference.impl.{ReferenceFlow, ReferenceSource, ReferenceWithResourceFlow, Setup} +import akka.stream.alpakka.reference.impl.{ReferenceFlow, ReferenceSource, ReferenceWithResourceFlow} import akka.stream.alpakka.reference._ import akka.stream.scaladsl.{Flow, Source} @@ -40,13 +40,13 @@ object Reference { * An implementation of a flow that needs access to materializer or attributes during materialization. */ def flowWithResource(): Flow[ReferenceWriteMessage, ReferenceWriteResult, NotUsed] = - Setup - .flow { mat => implicit attr => - implicit val sys = mat.system - Flow.fromGraph(new ReferenceWithResourceFlow(resolveResource())) + Flow + .setup { + case (mat, attr) => + Flow.fromGraph(new ReferenceWithResourceFlow(resolveResource(mat.system, attr))) } .mapMaterializedValue(_ => NotUsed) - private def resolveResource()(implicit sys: ActorSystem, attr: Attributes) = - attr.get[ReferenceResourceValue].map(_.resource).getOrElse(ResourceExt().resource) + private def resolveResource(sys: ActorSystem, attr: Attributes) = + attr.get[ReferenceResourceValue].map(_.resource).getOrElse(ResourceExt()(sys).resource) } diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala b/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala index f75040eb5b..b40cd495b1 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala @@ -111,18 +111,21 @@ import akka.util.ByteString ): Source[Option[(Source[ByteString, NotUsed], ObjectMetadata)], NotUsed] = { val s3Headers = sse.toIndexedSeq.flatMap(_.headersFor(GetObject)) - Setup.source { implicit mat => _ => - request(s3Location, rangeOption = range, versionId = versionId, s3Headers = s3Headers) - .map(response => response.withEntity(response.entity.withoutSizeLimit)) - .mapAsync(parallelism = 1)(entityForSuccess) - .map { - case (entity, headers) => - Option((entity.dataBytes.mapMaterializedValue(_ => NotUsed), computeMetaData(headers, entity))) - } - .recover[Option[(Source[ByteString, NotUsed], ObjectMetadata)]] { - case e: S3Exception if e.code == "NoSuchKey" => None - } - } + Source + .setup { + case (mat, attr) => + implicit val materializer = mat + request(s3Location, rangeOption = range, versionId = versionId, s3Headers = s3Headers) + .map(response => response.withEntity(response.entity.withoutSizeLimit)) + .mapAsync(parallelism = 1)(entityForSuccess) + .map { + case (entity, headers) => + Option((entity.dataBytes.mapMaterializedValue(_ => NotUsed), computeMetaData(headers, entity))) + } + .recover[Option[(Source[ByteString, NotUsed], ObjectMetadata)]] { + case e: S3Exception if e.code == "NoSuchKey" => None + } + } }.mapMaterializedValue(_ => NotUsed) def listBucket(bucket: String, prefix: Option[String] = None): Source[ListBucketResultContents, NotUsed] = { @@ -136,8 +139,7 @@ import akka.util.ByteString )(implicit mat: ActorMaterializer, attr: Attributes): Future[Option[(ListBucketState, Seq[ListBucketResultContents])]] = { import mat.executionContext - implicit val sys = mat.system - implicit val conf = resolveSettings() + implicit val conf = resolveSettings(attr, mat.system) signAndGetAs[ListBucketResult](HttpRequests.listBucket(bucket, prefix, token)) .map { (res: ListBucketResult) => @@ -150,15 +152,18 @@ import akka.util.ByteString } } - Setup - .source { implicit mat => implicit attr => - Source - .unfoldAsync[ListBucketState, Seq[ListBucketResultContents]](Starting) { - case Finished => Future.successful(None) - case Starting => listBucketCall(None) - case Running(token) => listBucketCall(Some(token)) - } - .mapConcat(identity) + Source + .setup { + case (mat, attr) => + implicit val materializer = mat + implicit val attributes = attr + Source + .unfoldAsync[ListBucketState, Seq[ListBucketResultContents]](Starting) { + case Finished => Future.successful(None) + case Starting => listBucketCall(None) + case Running(token) => listBucketCall(Some(token)) + } + .mapConcat(identity) } .mapMaterializedValue(_ => NotUsed) } @@ -167,43 +172,48 @@ import akka.util.ByteString key: String, versionId: Option[String], sse: Option[ServerSideEncryption]): Source[Option[ObjectMetadata], NotUsed] = - Setup - .source { implicit mat => _ => - import mat.executionContext - val s3Headers = sse.toIndexedSeq.flatMap(_.headersFor(HeadObject)) - request(S3Location(bucket, key), HttpMethods.HEAD, versionId = versionId, s3Headers = s3Headers).flatMapConcat { - case HttpResponse(OK, headers, entity, _) => - Source.fromFuture { - entity.withoutSizeLimit().discardBytes().future().map { _ => - Some(computeMetaData(headers, entity)) - } - } - case HttpResponse(NotFound, _, entity, _) => - Source.fromFuture(entity.discardBytes().future().map(_ => None)) - case HttpResponse(code, _, entity, _) => - Source.fromFuture { - Unmarshal(entity).to[String].map { err => - throw new S3Exception(err, code) - } + Source + .setup { + case (mat, attr) => + implicit val materializer = mat + import mat.executionContext + val s3Headers = sse.toIndexedSeq.flatMap(_.headersFor(HeadObject)) + request(S3Location(bucket, key), HttpMethods.HEAD, versionId = versionId, s3Headers = s3Headers) + .flatMapConcat { + case HttpResponse(OK, headers, entity, _) => + Source.fromFuture { + entity.withoutSizeLimit().discardBytes().future().map { _ => + Some(computeMetaData(headers, entity)) + } + } + case HttpResponse(NotFound, _, entity, _) => + Source.fromFuture(entity.discardBytes().future().map(_ => None)) + case HttpResponse(code, _, entity, _) => + Source.fromFuture { + Unmarshal(entity).to[String].map { err => + throw new S3Exception(err, code) + } + } } - } } .mapMaterializedValue(_ => NotUsed) def deleteObject(s3Location: S3Location, versionId: Option[String]): Source[Done, NotUsed] = - Setup - .source { implicit mat => _ => - import mat.executionContext - request(s3Location, HttpMethods.DELETE, versionId = versionId).flatMapConcat { - case HttpResponse(NoContent, _, entity, _) => - Source.fromFuture(entity.discardBytes().future().map(_ => Done)) - case HttpResponse(code, _, entity, _) => - Source.fromFuture { - Unmarshal(entity).to[String].map { err => - throw new S3Exception(err, code) + Source + .setup { + case (mat, attr) => + implicit val m = mat + import mat.executionContext + request(s3Location, HttpMethods.DELETE, versionId = versionId).flatMapConcat { + case HttpResponse(NoContent, _, entity, _) => + Source.fromFuture(entity.discardBytes().future().map(_ => Done)) + case HttpResponse(code, _, entity, _) => + Source.fromFuture { + Unmarshal(entity).to[String].map { err => + throw new S3Exception(err, code) + } } - } - } + } } .mapMaterializedValue(_ => NotUsed) @@ -224,29 +234,32 @@ import akka.util.ByteString val headers = s3Headers.headersFor(PutObject) - Setup - .source { implicit mat => implicit attr => - import mat.executionContext - implicit val sys = mat.system - implicit val conf = resolveSettings() - - val req = uploadRequest(s3Location, data, contentLength, contentType, headers) - - signAndRequest(req) - .flatMapConcat { - case HttpResponse(OK, h, entity, _) => - Source.fromFuture { - entity.discardBytes().future().map { _ => - ObjectMetadata(h :+ `Content-Length`(entity.contentLengthOption.getOrElse(0))) + Source + .setup { + case (mat, attr) => + implicit val materializer = mat + implicit val attributes = attr + import mat.executionContext + implicit val sys = mat.system + implicit val conf = resolveSettings(attr, mat.system) + + val req = uploadRequest(s3Location, data, contentLength, contentType, headers) + + signAndRequest(req) + .flatMapConcat { + case HttpResponse(OK, h, entity, _) => + Source.fromFuture { + entity.discardBytes().future().map { _ => + ObjectMetadata(h :+ `Content-Length`(entity.contentLengthOption.getOrElse(0))) + } } - } - case HttpResponse(code, _, entity, _) => - Source.fromFuture { - Unmarshal(entity).to[String].map { err => - throw new S3Exception(err, code) + case HttpResponse(code, _, entity, _) => + Source.fromFuture { + Unmarshal(entity).to[String].map { err => + throw new S3Exception(err, code) + } } - } - } + } } .mapMaterializedValue(_ => NotUsed) } @@ -256,11 +269,13 @@ import akka.util.ByteString rangeOption: Option[ByteRange] = None, versionId: Option[String] = None, s3Headers: Seq[HttpHeader] = Seq.empty): Source[HttpResponse, NotUsed] = - Setup - .source { mat => implicit attr => - implicit val sys = mat.system - implicit val conf = resolveSettings() - signAndRequest(requestHeaders(getDownloadRequest(s3Location, method, s3Headers, versionId), rangeOption)) + Source + .setup { + case (mat, attr) => + implicit val attributes = attr + implicit val sys = mat.system + implicit val conf = resolveSettings(attr, mat.system) + signAndRequest(requestHeaders(getDownloadRequest(s3Location, method, s3Headers, versionId), rangeOption)) } .mapMaterializedValue(_ => NotUsed) @@ -305,21 +320,23 @@ import akka.util.ByteString method: HttpMethod, process: (HttpResponse, Materializer) => Future[T] ): Source[T, NotUsed] = - Setup - .source { mat => implicit attr => - implicit val sys: ActorSystem = mat.system - implicit val conf: S3Settings = resolveSettings() - - val location = S3Location(bucket = bucket, key = "") - - signAndRequest( - requestHeaders( - HttpRequests.bucketManagementRequest(location, method), - None - ) - ).mapAsync(1) { response => - process(response, mat) - } + Source + .setup { + case (mat, attr) => + implicit val attributes = attr + implicit val sys: ActorSystem = mat.system + implicit val conf: S3Settings = resolveSettings(attr, mat.system) + + val location = S3Location(bucket = bucket, key = "") + + signAndRequest( + requestHeaders( + HttpRequests.bucketManagementRequest(location, method), + None + ) + ).mapAsync(1) { response => + process(response, mat) + } } .mapMaterializedValue(_ => NotUsed) @@ -380,24 +397,27 @@ import akka.util.ByteString private def initiateMultipartUpload(s3Location: S3Location, contentType: ContentType, s3Headers: Seq[HttpHeader]): Source[MultipartUpload, NotUsed] = - Setup - .source { implicit mat => implicit attr => - import mat.executionContext - implicit val sys = mat.system - implicit val conf = resolveSettings() - - val req = initiateMultipartUploadRequest(s3Location, contentType, s3Headers) - - signAndRequest(req).flatMapConcat { - case HttpResponse(status, _, entity, _) if status.isSuccess() => - Source.fromFuture(Unmarshal(entity).to[MultipartUpload]) - case HttpResponse(code, _, entity, _) => - Source.fromFuture { - Unmarshal(entity).to[String].map { err => - throw new S3Exception(err, code) + Source + .setup { + case (mat, attr) => + implicit val materializer = mat + implicit val attributes = attr + import mat.executionContext + implicit val sys = mat.system + implicit val conf = resolveSettings(attr, mat.system) + + val req = initiateMultipartUploadRequest(s3Location, contentType, s3Headers) + + signAndRequest(req).flatMapConcat { + case HttpResponse(status, _, entity, _) if status.isSuccess() => + Source.fromFuture(Unmarshal(entity).to[MultipartUpload]) + case HttpResponse(code, _, entity, _) => + Source.fromFuture { + Unmarshal(entity).to[String].map { err => + throw new S3Exception(err, code) + } } - } - } + } } .mapMaterializedValue(_ => NotUsed) @@ -463,8 +483,7 @@ import akka.util.ByteString } import mat.executionContext - implicit val sys = mat.system - implicit val conf = resolveSettings() + implicit val conf = resolveSettings(attr, mat.system) val headers = sse.toIndexedSeq.flatMap(_.headersFor(UploadPart)) @@ -510,22 +529,22 @@ import akka.util.ByteString val headers = s3Headers.serverSideEncryption.toIndexedSeq.flatMap(_.headersFor(UploadPart)) - Setup - .flow { mat => implicit attr => - implicit val sys = mat.system - implicit val conf = resolveSettings() - - SplitAfterSize(chunkSize, MaxChunkSize)(atLeastOneByteString) - .via(getChunkBuffer(chunkSize)) //creates the chunks - .concatSubstreams - .zipWith(requestInfo) { - case (chunkedPayload, (uploadInfo, chunkIndex)) => - //each of the payload requests are created - val partRequest = - uploadPartRequest(uploadInfo, chunkIndex, chunkedPayload.data, chunkedPayload.size, headers) - (partRequest, (uploadInfo, chunkIndex)) - } - .flatMapConcat { case (req, info) => Signer.signedRequest(req, signingKey).zip(Source.single(info)) } + Flow + .setup { + case (mat, attr) => + implicit val conf = resolveSettings(attr, mat.system) + + SplitAfterSize(chunkSize, MaxChunkSize)(atLeastOneByteString) + .via(getChunkBuffer(chunkSize)) //creates the chunks + .concatSubstreams + .zipWith(requestInfo) { + case (chunkedPayload, (uploadInfo, chunkIndex)) => + //each of the payload requests are created + val partRequest = + uploadPartRequest(uploadInfo, chunkIndex, chunkedPayload.data, chunkedPayload.size, headers) + (partRequest, (uploadInfo, chunkIndex)) + } + .flatMapConcat { case (req, info) => Signer.signedRequest(req, signingKey).zip(Source.single(info)) } } .mapMaterializedValue(_ => NotUsed) } @@ -570,39 +589,41 @@ import akka.util.ByteString val requestFlow = createRequests(s3Location, contentType, s3Headers, chunkSize, parallelism) // The individual upload part requests are processed here - Setup - .flow { implicit mat => implicit attr => - import mat.executionContext - implicit val sys = mat.system - implicit val conf = resolveSettings() - - requestFlow - .via(superPool[(MultipartUpload, Int)]) - .mapAsync(parallelism) { - case (Success(r), (upload, index)) => - if (r.status.isFailure()) { - Unmarshal(r.entity).to[String].map { errorBody => - FailedUploadPart( - upload, - index, - new RuntimeException( - s"Upload part $index request failed. Response header: ($r), response body: ($errorBody)." + Flow + .setup { + case (mat, attr) => + implicit val materializer = mat + import mat.executionContext + implicit val sys = mat.system + implicit val conf = resolveSettings(attr, mat.system) + + requestFlow + .via(superPool[(MultipartUpload, Int)]) + .mapAsync(parallelism) { + case (Success(r), (upload, index)) => + if (r.status.isFailure()) { + Unmarshal(r.entity).to[String].map { errorBody => + FailedUploadPart( + upload, + index, + new RuntimeException( + s"Upload part $index request failed. Response header: ($r), response body: ($errorBody)." + ) + ) + } + } else { + r.entity.dataBytes.runWith(Sink.ignore) + val etag = r.headers.find(_.lowercaseName() == "etag").map(_.value) + etag + .map(t => Future.successful(SuccessfulUploadPart(upload, index, t))) + .getOrElse( + Future + .successful(FailedUploadPart(upload, index, new RuntimeException(s"Cannot find etag in ${r}"))) ) - ) } - } else { - r.entity.dataBytes.runWith(Sink.ignore) - val etag = r.headers.find(_.lowercaseName() == "etag").map(_.value) - etag - .map(t => Future.successful(SuccessfulUploadPart(upload, index, t))) - .getOrElse( - Future - .successful(FailedUploadPart(upload, index, new RuntimeException(s"Cannot find etag in ${r}"))) - ) - } - case (Failure(e), (upload, index)) => Future.successful(FailedUploadPart(upload, index, e)) - } + case (Failure(e), (upload, index)) => Future.successful(FailedUploadPart(upload, index, e)) + } } .mapMaterializedValue(_ => NotUsed) } @@ -611,28 +632,31 @@ import akka.util.ByteString s3Location: S3Location, sse: Option[ServerSideEncryption] ): Sink[UploadPartResponse, Future[MultipartUploadResult]] = - Setup - .sink { implicit mat => implicit attr => - val sys = mat.system - import sys.dispatcher - Sink - .seq[UploadPartResponse] - .mapMaterializedValue { responseFuture: Future[Seq[UploadPartResponse]] => - responseFuture - .flatMap { responses: Seq[UploadPartResponse] => - val successes = responses.collect { case r: SuccessfulUploadPart => r } - val failures = responses.collect { case r: FailedUploadPart => r } - if (responses.isEmpty) { - Future.failed(new RuntimeException("No Responses")) - } else if (failures.isEmpty) { - Future.successful(successes.sortBy(_.index)) - } else { - Future.failed(FailedUpload(failures.map(_.exception))) + Sink + .setup { + case (mat, attr) => + implicit val materializer = mat + implicit val attributes = attr + val sys = mat.system + import sys.dispatcher + Sink + .seq[UploadPartResponse] + .mapMaterializedValue { responseFuture: Future[Seq[UploadPartResponse]] => + responseFuture + .flatMap { responses: Seq[UploadPartResponse] => + val successes = responses.collect { case r: SuccessfulUploadPart => r } + val failures = responses.collect { case r: FailedUploadPart => r } + if (responses.isEmpty) { + Future.failed(new RuntimeException("No Responses")) + } else if (failures.isEmpty) { + Future.successful(successes.sortBy(_.index)) + } else { + Future.failed(FailedUpload(failures.map(_.exception))) + } } - } - .flatMap(completeMultipartUpload(s3Location, _, sse)) - } - .mapMaterializedValue(_.map(r => MultipartUploadResult(r.location, r.bucket, r.key, r.etag, r.versionId))) + .flatMap(completeMultipartUpload(s3Location, _, sse)) + } + .mapMaterializedValue(_.map(r => MultipartUploadResult(r.location, r.bucket, r.key, r.etag, r.versionId))) } .mapMaterializedValue(_.flatMap(identity)(ExecutionContexts.sameThreadExecutionContext)) @@ -666,7 +690,7 @@ import akka.util.ByteString request: HttpRequest, retries: Int = 3 )(implicit sys: ActorSystem, attr: Attributes): Source[HttpResponse, NotUsed] = { - implicit val conf = resolveSettings() + implicit val conf = resolveSettings(attr, sys) Signer .signedRequest(request, signingKey) @@ -717,24 +741,24 @@ import akka.util.ByteString val headers = s3Headers.serverSideEncryption.toIndexedSeq.flatMap(_.headersFor(CopyPart)) - Setup - .source { mat => implicit attr => - implicit val sys = mat.system - implicit val conf = resolveSettings() - - requestInfo - .zipWith(partitions) { - case ((upload, _), ls) => - ls.map { cp => - val multipartCopy = MultipartCopy(upload, cp) - val request = uploadCopyPartRequest(multipartCopy, sourceVersionId, headers) - (request, multipartCopy) - } - } - .mapConcat(identity) - .flatMapConcat { - case (req, info) => Signer.signedRequest(req, signingKey).zip(Source.single(info)) - } + Source + .setup { + case (mat, attr) => + implicit val conf = resolveSettings(attr, mat.system) + + requestInfo + .zipWith(partitions) { + case ((upload, _), ls) => + ls.map { cp => + val multipartCopy = MultipartCopy(upload, cp) + val request = uploadCopyPartRequest(multipartCopy, sourceVersionId, headers) + (request, multipartCopy) + } + } + .mapConcat(identity) + .flatMapConcat { + case (req, info) => Signer.signedRequest(req, signingKey).zip(Source.single(info)) + } } .mapMaterializedValue(_ => NotUsed) } @@ -742,38 +766,42 @@ import akka.util.ByteString private def processUploadCopyPartRequests( requests: Source[(HttpRequest, MultipartCopy), NotUsed] )(parallelism: Int) = - Setup.source { implicit mat => implicit attr => - import mat.executionContext - implicit val sys = mat.system - implicit val settings = resolveSettings() - - requests - .via(superPool[MultipartCopy]) - .map { - case (Success(r), multipartCopy) => - val entity = r.entity - val upload = multipartCopy.multipartUpload - val index = multipartCopy.copyPartition.partNumber - import StatusCodes._ - r.status match { - case OK => - Unmarshal(entity).to[CopyPartResult].map(cp => SuccessfulUploadPart(upload, index, cp.eTag)) - case statusCode: StatusCode => - Unmarshal(entity).to[String].map { err => - val response = Option(err).getOrElse(s"Failed to upload part into S3, status code was: $statusCode") - throw new S3Exception(response, statusCode) + Source + .setup { + case (mat, attr) => + implicit val materializer = mat + import mat.executionContext + implicit val sys = mat.system + implicit val settings = resolveSettings(attr, mat.system) + + requests + .via(superPool[MultipartCopy]) + .map { + case (Success(r), multipartCopy) => + val entity = r.entity + val upload = multipartCopy.multipartUpload + val index = multipartCopy.copyPartition.partNumber + import StatusCodes._ + r.status match { + case OK => + Unmarshal(entity).to[CopyPartResult].map(cp => SuccessfulUploadPart(upload, index, cp.eTag)) + case statusCode: StatusCode => + Unmarshal(entity).to[String].map { err => + val response = + Option(err).getOrElse(s"Failed to upload part into S3, status code was: $statusCode") + throw new S3Exception(response, statusCode) + } } - } - case (Failure(ex), multipartCopy) => - Future.successful( - FailedUploadPart(multipartCopy.multipartUpload, multipartCopy.copyPartition.partNumber, ex) - ) - } - .mapAsync(parallelism)(identity) - } + case (Failure(ex), multipartCopy) => + Future.successful( + FailedUploadPart(multipartCopy.multipartUpload, multipartCopy.copyPartition.partNumber, ex) + ) + } + .mapAsync(parallelism)(identity) + } - private def resolveSettings()(implicit attr: Attributes, sys: ActorSystem) = + private def resolveSettings(attr: Attributes, sys: ActorSystem) = attr .get[S3SettingsValue] .map(_.settings) diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/impl/SetupStage.scala b/s3/src/main/scala/akka/stream/alpakka/s3/impl/SetupStage.scala deleted file mode 100644 index 9142e46fb8..0000000000 --- a/s3/src/main/scala/akka/stream/alpakka/s3/impl/SetupStage.scala +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Copyright (C) 2016-2019 Lightbend Inc. - */ - -package akka.stream.alpakka.s3.impl - -import akka.annotation.InternalApi -import akka.stream._ -import akka.stream.scaladsl.{Flow, Keep, Sink, Source} -import akka.stream.stage._ - -import scala.concurrent.{Future, Promise} - -private final class SetupSinkStage[T, M](factory: ActorMaterializer => Attributes => Sink[T, M]) - extends GraphStageWithMaterializedValue[SinkShape[T], Future[M]] { - - private val in = Inlet[T]("SetupSinkStage.in") - override val shape = SinkShape(in) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { - val matPromise = Promise[M] - (createStageLogic(matPromise), matPromise.future) - } - - private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) { - import SetupStage._ - - val subOutlet = new SubSourceOutlet[T]("SetupSinkStage") - subOutlet.setHandler(delegateToInlet(() => pull(in), () => cancel(in))) - setHandler(in, delegateToSubOutlet(() => grab(in), subOutlet)) - - override def preStart(): Unit = { - val sink = factory(actorMaterializer(materializer))(attributes) - - val mat = Source.fromGraph(subOutlet.source).runWith(sink.withAttributes(attributes))(subFusingMaterializer) - matPromise.success(mat) - } - } - -} - -private final class SetupFlowStage[T, U, M](factory: ActorMaterializer => Attributes => Flow[T, U, M]) - extends GraphStageWithMaterializedValue[FlowShape[T, U], Future[M]] { - - private val in = Inlet[T]("SetupFlowStage.in") - private val out = Outlet[U]("SetupFlowStage.out") - override val shape = FlowShape(in, out) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { - val matPromise = Promise[M] - (createStageLogic(matPromise), matPromise.future) - } - - private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) { - import SetupStage._ - - val subInlet = new SubSinkInlet[U]("SetupFlowStage") - val subOutlet = new SubSourceOutlet[T]("SetupFlowStage") - - subInlet.setHandler(delegateToOutlet(push(out, _: U), () => complete(out), fail(out, _), subInlet)) - subOutlet.setHandler(delegateToInlet(() => pull(in), () => cancel(in))) - - setHandler(in, delegateToSubOutlet(() => grab(in), subOutlet)) - setHandler(out, delegateToSubInlet(subInlet)) - - override def preStart(): Unit = { - val flow = factory(actorMaterializer(materializer))(attributes) - - val mat = Source - .fromGraph(subOutlet.source) - .viaMat(flow.withAttributes(attributes))(Keep.right) - .to(Sink.fromGraph(subInlet.sink)) - .run()(subFusingMaterializer) - matPromise.success(mat) - } - } -} - -private final class SetupSourceStage[T, M](factory: ActorMaterializer => Attributes => Source[T, M]) - extends GraphStageWithMaterializedValue[SourceShape[T], Future[M]] { - - private val out = Outlet[T]("SetupSourceStage.out") - override val shape = SourceShape(out) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { - val matPromise = Promise[M] - (createStageLogic(matPromise), matPromise.future) - } - - private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) { - import SetupStage._ - - val subInlet = new SubSinkInlet[T]("SetupSourceStage") - subInlet.setHandler(delegateToOutlet(push(out, _: T), () => complete(out), fail(out, _), subInlet)) - setHandler(out, delegateToSubInlet(subInlet)) - - override def preStart(): Unit = { - val source = factory(actorMaterializer(materializer))(attributes) - - val mat = source - .withAttributes(attributes) - .to(Sink.fromGraph(subInlet.sink)) - .run()(subFusingMaterializer) - matPromise.success(mat) - } - } -} - -private object SetupStage { - def delegateToSubOutlet[T](grab: () => T, subOutlet: GraphStageLogic#SubSourceOutlet[T]) = new InHandler { - override def onPush(): Unit = - subOutlet.push(grab()) - override def onUpstreamFinish(): Unit = - subOutlet.complete() - override def onUpstreamFailure(ex: Throwable): Unit = - subOutlet.fail(ex) - } - - def delegateToOutlet[T](push: T => Unit, - complete: () => Unit, - fail: Throwable => Unit, - subInlet: GraphStageLogic#SubSinkInlet[T]) = new InHandler { - override def onPush(): Unit = - push(subInlet.grab()) - override def onUpstreamFinish(): Unit = - complete() - override def onUpstreamFailure(ex: Throwable): Unit = - fail(ex) - } - - def delegateToSubInlet[T](subInlet: GraphStageLogic#SubSinkInlet[T]) = new OutHandler { - override def onPull(): Unit = - subInlet.pull() - override def onDownstreamFinish(): Unit = - subInlet.cancel() - } - - def delegateToInlet(pull: () => Unit, cancel: () => Unit) = new OutHandler { - override def onPull(): Unit = - pull() - override def onDownstreamFinish(): Unit = - cancel() - } - - def actorMaterializer(mat: Materializer): ActorMaterializer = mat match { - case am: ActorMaterializer => am - case _ => throw new Error("ActorMaterializer required") - } -} - -@InternalApi private[impl] object Setup { - def sink[T, M](factory: ActorMaterializer => Attributes => Sink[T, M]): Sink[T, Future[M]] = - Sink.fromGraph(new SetupSinkStage(factory)) - - def flow[T, U, M](factory: ActorMaterializer => Attributes => Flow[T, U, M]): Flow[T, U, Future[M]] = - Flow.fromGraph(new SetupFlowStage(factory)) - - def source[T, M](factory: ActorMaterializer => Attributes => Source[T, M]): Source[T, Future[M]] = - Source.fromGraph(new SetupSourceStage(factory)) -}