Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP - Adds Google cloud storage support #650

Closed
wants to merge 9 commits into from
3 changes: 3 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ lazy val modules: Seq[ProjectReference] = Seq(
ftp,
geode,
googleCloudPubSub,
googleCloudStorage,
hbase,
ironmq,
jms,
Expand Down Expand Up @@ -97,6 +98,8 @@ lazy val googleCloudPubSub = alpakkaProject(
parallelExecution in Test := false
)

lazy val googleCloudStorage = alpakkaProject("google-cloud-storage", Dependencies.GoogleStorage)

lazy val hbase = alpakkaProject("hbase", Dependencies.HBase, fork in Test := true)

lazy val ironmq = alpakkaProject("ironmq", Dependencies.IronMq)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (C) 2016-2018 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.googlecloud.storage

import java.nio.file.{Files, Path, Paths}
import java.security.{KeyFactory, PrivateKey}
import java.security.spec.PKCS8EncodedKeySpec
import java.util.Base64

import com.typesafe.config.Config
import spray.json._
import spray.json.DefaultJsonProtocol._

import scala.io.Source

object GoogleAuthConfiguration {

private final case class ServiceAccountFile(project_id: String, private_key: String, client_email: String)

private implicit val serviceAccountFileFormat = jsonFormat3(ServiceAccountFile)

private val configPath = "alpakka.google.serviceAccountFile"

private def readServiceAccountFile(serviceAccountFile: Path): ServiceAccountFile = {
if (Files.notExists(serviceAccountFile)) {
throw new RuntimeException(s"Service account file missing: ${serviceAccountFile.toAbsolutePath}")
}
val bufferedSource = Source.fromFile(serviceAccountFile.toFile)
val contentAsJson = bufferedSource.getLines().mkString.parseJson
bufferedSource.close()
contentAsJson.convertTo[ServiceAccountFile]
}

private def parsePrivateKey(privateKey: String): PrivateKey = {
val pk = privateKey
.replace("-----BEGIN RSA PRIVATE KEY-----\n", "")
.replace("-----END RSA PRIVATE KEY-----", "")
.replace("-----BEGIN PRIVATE KEY-----\n", "")
.replace("-----END PRIVATE KEY-----", "")
.replaceAll(raw"\s", "")
val kf = KeyFactory.getInstance("RSA")
val encodedPv = Base64.getDecoder.decode(pk)
val keySpecPv = new PKCS8EncodedKeySpec(encodedPv)
kf.generatePrivate(keySpecPv)
}

def apply(serviceAccountFile: Path): GoogleAuthConfiguration = {
val serviceAccount = readServiceAccountFile(serviceAccountFile)
val privateKey = parsePrivateKey(serviceAccount.private_key)
GoogleAuthConfiguration(privateKey, serviceAccount.client_email, serviceAccount.project_id)
}

def apply(config: Config): Option[GoogleAuthConfiguration] =
Some(config)
.filter(_.hasPath(configPath))
.map(_.getString(configPath))
.filter(_.trim.nonEmpty)
.map(file => apply(Paths.get(file)))

}

final case class GoogleAuthConfiguration(privateKey: PrivateKey, clientEmail: String, projectId: String)
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (C) 2016-2018 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.googlecloud.storage

import scala.collection.immutable.Seq

object Model {
final case class StorageObject(
kind: String,
id: String,
name: String,
bucket: String,
generation: String,
contentType: Option[String],
size: String,
etag: String
)

// TODO add more from https://github.com/GoogleCloudPlatform/google-cloud-java/blob/master/google-cloud-storage/src/main/java/com/google/cloud/storage/BucketInfo.java
final case class BucketInfo(
/**
* The bucket name
*/
name: String,
/**
* The bucket's location. Data for blobs in the bucket resides in physical storage within
* this region. A list of supported values is available
* <a href="https://cloud.google.com/storage/docs/bucket-locations">here</a>.
*/
location: String,
/**
* The kind of bucket
*/
kind: Option[String] = None,
/**
* THe id of the bucket
*/
id: Option[String] = None
)

final case class BucketListResult(
kind: String,
nextPageToken: Option[String],
prefixes: Option[List[String]],
items: List[StorageObject]
) {
def merge(other: BucketListResult): BucketListResult =
//todo merge prefixes
copy(nextPageToken = None, items = this.items ++ other.items)
}

final class ObjectNotFoundException(err: String) extends RuntimeException(err)

final case class MultiPartUpload(uploadId: String)

sealed trait UploadPartResponse
final case class SuccessfulUploadPart(multiPartUpload: MultiPartUpload, index: Int) extends UploadPartResponse
final case class FailedUploadPart(multiPartUpload: MultiPartUpload, index: Int, exception: Throwable)
extends UploadPartResponse
final case class SuccessfulUpload(multiPartUpload: MultiPartUpload, index: Int, storageObject: StorageObject)
extends UploadPartResponse

final case class FailedUpload(reasons: Seq[Throwable]) extends Exception(reasons.map(_.getMessage).mkString(", "))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (C) 2016-2018 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.googlecloud.storage.impl

import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.util.ByteString

@akka.annotation.InternalApi
private[storage] final case class Chunk(bytes: ByteString, totalSize: Option[Long] = None) {
def size: Int = bytes.size
}

// Inspired from akka doc : https://doc.akka.io/docs/akka/current/stream/stream-cookbook.html?language=scala#chunking-up-a-stream-of-bytestrings-into-limited-size-bytestrings
@akka.annotation.InternalApi
private[storage] class Chunker(val chunkSize: Int) extends GraphStage[FlowShape[ByteString, Chunk]] {

val in = Inlet[ByteString]("Chunker.in")
val out = Outlet[Chunk]("Chunker.out")
override val shape = FlowShape.of(in, out)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private var buffer = ByteString.empty
private var totalSize = 0l

setHandler(out, new OutHandler {
override def onPull(): Unit =
if (isClosed(in)) emitChunk()
else pull(in)
})
setHandler(
in,
new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
buffer ++= elem
emitChunk()
}

override def onUpstreamFinish(): Unit =
if (isAvailable(out)) emitChunk()
}
)

private def emitChunk(): Unit =
if (isClosed(in)) {
if (buffer.nonEmpty) {
totalSize += buffer.size
emit(out, Chunk(buffer, Some(totalSize)))
}
completeStage()
} else {
if (buffer.isEmpty) {
pull(in)
} else if (buffer.size > chunkSize) {
val (chunk, nextBuffer) = buffer.splitAt(chunkSize)
buffer = nextBuffer
totalSize += chunk.size
emit(out, Chunk(chunk))
} else {
pull(in)
}
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (C) 2016-2018 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.googlecloud.storage.impl

import akka.stream.alpakka.googlecloud.storage.Model.{BucketInfo, BucketListResult, StorageObject}
import spray.json.{DefaultJsonProtocol, JsValue, RootJsonReader}

@akka.annotation.InternalApi
object Formats extends DefaultJsonProtocol {

private final case class BucketListResultJson(kind: String,
nextPageToken: Option[String],
prefixes: Option[List[String]],
items: Option[List[StorageObject]])

implicit val storageObjectFormat = jsonFormat8(StorageObject)
implicit val bucketInfoFormat = jsonFormat4(BucketInfo)

private implicit val bucketListResultJsonReads = jsonFormat4(BucketListResultJson)

implicit object BucketListResultReads extends RootJsonReader[BucketListResult] {
override def read(json: JsValue): BucketListResult = {
val res = bucketListResultJsonReads.read(json)
BucketListResult(
res.kind,
res.nextPageToken,
res.prefixes,
res.items.getOrElse(List.empty)
)
}
}

}
Loading