Skip to content

Commit

Permalink
Google cloud storage support (#1340)
Browse files Browse the repository at this point in the history
Google cloud storage support
  • Loading branch information
2m authored Jun 25, 2019
2 parents 1d9899c + eeac4a4 commit be3b89a
Show file tree
Hide file tree
Showing 41 changed files with 5,320 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ jobs:
- env:
- DIR=google-cloud-pub-sub-grpc
- PRE_CMD="docker-compose up -d gcloud-pubsub-emulator_prep"
- env:
- DIR=google-cloud-storage
- env:
- DIR=google-fcm
- env:
Expand Down
4 changes: 4 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ lazy val modules: Seq[ProjectReference] = Seq(
geode,
googleCloudPubSub,
googleCloudPubSubGrpc,
googleCloudStorage,
googleFcm,
hbase,
hdfs,
Expand Down Expand Up @@ -173,6 +174,9 @@ lazy val googleCloudPubSubGrpc = alpakkaProject(
crossScalaVersions --= Seq(Dependencies.Scala211, Dependencies.Scala213) // https://github.com/akka/akka-grpc/pull/599
).enablePlugins(AkkaGrpcPlugin, JavaAgent)

lazy val googleCloudStorage =
alpakkaProject("google-cloud-storage", "google.cloud.storage", Dependencies.GoogleStorage).disablePlugins(MimaPlugin)

lazy val googleFcm = alpakkaProject(
"google-fcm",
"google.firebase.fcm",
Expand Down
153 changes: 153 additions & 0 deletions docs/src/main/paradox/google-cloud-storage.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
# Google Cloud Storage

Google Cloud Storage allows world-wide storage and retrieval of any amount of data at any time.

Further information at the official [Google Cloud Storage documentation website](https://cloud.google.com/storage/docs/).
This connector communicates to Cloud Storage via HTTP requests.

@@project-info{ projectId="google-cloud-storage" }

## Artifacts

@@dependency [sbt,Maven,Gradle] {
group=com.lightbend.akka
artifact=akka-stream-alpakka-google-cloud-storage_$scala.binary.version$
version=$project.version$
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

@@dependencies { projectId="google-cloud-storage" }

## Configuration

The settings for the Google Cloud Storage connector are read by default from `alpakka.googlecloud.storage` configuration section.
If you use a non-standard configuration path or need multiple different configurations, please refer to @ref[the attributes section below](google-cloud-storage.md#apply-google-cloud-storage-settings-to-a-part-of-the-stream) to see how to apply different configuration to different parts of the stream.
You'll first need to prepare your credentials for access to google cloud storage.
All of the available configuration settings can be found in the @github[application.conf](/google-cloud-storage/src/test/resources/application.conf).

HOCON:
: @@snip [snip](/google-cloud-storage/src/test/resources/application.conf) { #settings }


## Store a file in Google Cloud Storage

A file can be uploaded to Google Cloud Storage by creating a source of @scala[@scaladoc[ByteString](akka.util.ByteString)]@java[@javadoc[ByteString](akka.util.ByteString)] and running that with a sink created from @scala[@scaladoc[GCStorage.resumableUpload](akka.stream.alpakka.googlecloud.storage.scaladsl.GCStorage$)]@java[@scaladoc[GCStorage.resumableUpload](akka.stream.alpakka.googlecloud.storage.javadsl.GCStorage$)].

Scala
: @@snip [snip](/google-cloud-storage/src/test/scala/docs/scaladsl/GCStorageSinkSpec.scala) { #upload }

Java
: @@snip [snip](/google-cloud-storage/src/test/java/docs/javadsl/GCStorageTest.java) { #upload }

## Download a file from Google Cloud Storage

A source for downloading a file can be created by calling @scala[@scaladoc[GCStorage.download](akka.stream.alpakka.googlecloud.storage.scaladsl.GCStorage$)]@java[@scaladoc[GCStorage.download](akka.stream.alpakka.googlecloud.storage.javadsl.GCStorage$)].
It will emit an @scala[`Option`]@java[`Optional`] that will hold file's data or will be empty if no such file can be found.

Scala
: @@snip [snip](/google-cloud-storage/src/test/scala/docs/scaladsl/GCStorageSourceSpec.scala) { #download }

Java
: @@snip [snip](/google-cloud-storage/src/test/java/docs/javadsl/GCStorageTest.java) { #download }


## Access object metadata without downloading object from Google Cloud Storage

If you do not need object itself, you can query for only object metadata using a source from @scala[@scaladoc[GCStorage.getObject](akka.stream.alpakka.googlecloud.storage.scaladsl.GCStorage$)]@java[@scaladoc[GCStorage.getObject](akka.stream.alpakka.googlecloud.storage.javadsl.GCStorage$)].

Scala
: @@snip [snip](/google-cloud-storage/src/test/scala/docs/scaladsl/GCStorageSourceSpec.scala) { #objectMetadata }

Java
: @@snip [snip](/google-cloud-storage/src/test/java/docs/javadsl/GCStorageTest.java) { #objectMetadata }

## List bucket contents

To get a list of all objects in a bucket, use @scala[@scaladoc[GCStorage.listBucket](akka.stream.alpakka.googlecloud.storage.scaladsl.GCStorage$)]@java[@scaladoc[GCStorage.listBucket](akka.stream.alpakka.googlecloud.storage.javadsl.GCStorage$)].
When run, this will give a stream of @scaladoc[StorageObject](akka.stream.alpakka.googlecloud.storage.StorageObject).

Scala
: @@snip [snip](/google-cloud-storage/src/test/scala/docs/scaladsl/GCStorageSourceSpec.scala) { #list-bucket }

Java
: @@snip [snip](/google-cloud-storage/src/test/java/docs/javadsl/GCStorageTest.java) { #list-bucket }

## Rewrite (multi part)

Copy an Google Clouds Storage object from source bucket to target bucket using @scala[@scaladoc[GCStorage.rewrite](akka.stream.alpakka.googlecloud.storage.scaladsl.GCStorage$)]@java[@scaladoc[GCStorage.rewrite](akka.stream.alpakka.googlecloud.stourage.javadsl.GCStorage$)].
When run, this will emit a single @scaladoc[StorageObject](akka.stream.alpakka.googlecloud.storage.StorageObject) with the information about the copied object.

Scala
: @@snip [snip](/google-cloud-storage/src/test/scala/docs/scaladsl/GCStorageSinkSpec.scala) { #rewrite }

Java
: @@snip [snip](/google-cloud-storage/src/test/java/docs/javadsl/GCStorageTest.java) { #rewrite }

## Apply Google Cloud Storage settings to a part of the stream

It is possible to make one part of the stream use different @scaladoc[GCStorageSettings](akka.stream.alpakka.googlecloud.storage.GCStorageSettings) from the rest of the graph.
This can be useful, when one stream is used to copy files across regions with different service accounts.
You can attach a custom `GCStorageSettings` instance or a custom config path to a graph using attributes from @scaladoc[GCStorageAttributes](akka.stream.alpakka.googlecloud.storage.GCStorageAttributes$):

Scala
: @@snip [snip](/google-cloud-storage/src/test/scala/docs/scaladsl/GCStorageSourceSpec.scala) { #list-bucket-attributes }

Java
: @@snip [snip](/google-cloud-storage/src/test/java/docs/javadsl/GCStorageTest.java) { #list-bucket-attributes }


## Bucket management

Bucket management API provides functionality for both Sources and Futures / CompletionStages.
In case of the Future API user can specify attributes to the request in the method itself and as for Sources it can be done via method `.withAttributes`.
For more information about attributes see: @scaladoc[GCStorageAttributes](akka.stream.alpakka.googlecloud.storage.GCStorageAttributes$) and @scaladoc[Attributes](akka.stream.Attributes)

### Make bucket
In order to create a bucket in Google Cloud Storage you need to specify it's unique name. This value has to be set accordingly to the [requirements](https://cloud.google.com/storage/docs/naming).
The bucket will be created in the given location.

Scala
: @@snip [snip](/google-cloud-storage/src/test/scala/docs/scaladsl/GCStorageSourceSpec.scala) { #make-bucket }

Java
: @@snip [snip](/google-cloud-storage/src/test/java/docs/javadsl/GCStorageTest.java) { #make-bucket }


### Delete bucket
To delete a bucket you need to specify its name and the bucket needs to be empty.

Scala
: @@snip [snip](/google-cloud-storage/src/test/scala/docs/scaladsl/GCStorageSourceSpec.scala) { #delete-bucket }

Java
: @@snip [snip](/google-cloud-storage/src/test/java/docs/javadsl/GCStorageTest.java) { #delete-bucket }


### Get bucket
To get a bucket you need to specify its name.

Scala
: @@snip [snip](/google-cloud-storage/src/test/scala/docs/scaladsl/GCStorageSourceSpec.scala) { #get-bucket }

Java
: @@snip [snip](/google-cloud-storage/src/test/java/docs/javadsl/GCStorageTest.java) { #get-bucket }


## Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

Scala
: ```
sbt
> google-cloud-storage/test
```

Java
: ```
sbt
> google-cloud-storage/test
```

> Some test code requires access to Google cloud storage, to run them you will need to configure a project and pub/sub in google cloud and provide your own credentials.
1 change: 1 addition & 0 deletions docs/src/main/paradox/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ The [Alpakka project](https://doc.akka.io/docs/alpakka/current/) is an open sour
* [FTP](ftp.md)
* [Google Cloud Pub/Sub](google-cloud-pub-sub.md)
* [Google Cloud Pub/Sub gRPC](google-cloud-pub-sub-grpc.md)
* [Google Cloud Storage](google-cloud-storage.md)
* [Google Firebase Cloud Messaging](google-fcm.md)
* [gRPC](external/grpc.md)
* [Hadoop Distributed File System](hdfs.md)
Expand Down
6 changes: 6 additions & 0 deletions google-cloud-storage/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
alpakka.google.cloud.storage {
base-url = "https://www.googleapis.com/" // default
base-path = "/storage/v1" // default
token-url = "https://www.googleapis.com/oauth2/v4/token" // default
token-scope = "https://www.googleapis.com/auth/devstorage.read_write" // default
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright (C) 2016-2019 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.googlecloud.storage

/**
* Represents a bucket in Google Cloud Storage.
*
* @param name The name of the bucket
* @param location The location of the bucket, object data for objects in the bucket resides in physical storage within this region, Defaults to US.
* @param kind The kind of item this is
* @param id The ID of the bucket
* @param selfLink The URI of this bucket
* @param etag HTTP 1.1 Entity tag for the bucket
*/
final class Bucket private (
val name: String,
val location: String,
val kind: String,
val id: String,
val selfLink: String,
val etag: String
) {

/** Java API */
def getName: String = name

/** Java API */
def getLocation: String = location

/** Java API */
def getKind: String = kind

/** Java API */
def getId: String = id

/** Java API */
def getSelfLink: String = selfLink

/** Java API */
def getEtag: String = etag

def withName(value: String): Bucket = copy(name = value)
def withLocation(value: String): Bucket = copy(location = value)
def withKind(value: String): Bucket = copy(kind = value)
def withId(value: String): Bucket = copy(id = value)
def withSelfLink(value: String): Bucket = copy(selfLink = value)
def withEtag(value: String): Bucket = copy(etag = value)

private def copy(
name: String = name,
location: String = location,
kind: String = kind,
id: String = id,
selfLink: String = selfLink,
etag: String = etag
): Bucket = new Bucket(
name = name,
location = location,
kind = kind,
id = id,
selfLink = selfLink,
etag = etag
)

override def toString =
"BucketInfo(" +
s"name=$name," +
s"location=$location," +
s"kind=$kind," +
s"id=$id," +
s"selfLink=$selfLink," +
s"etag=$etag" +
")"

override def equals(other: Any): Boolean = other match {
case that: Bucket =>
java.util.Objects.equals(this.name, that.name) &&
java.util.Objects.equals(this.location, that.location) &&
java.util.Objects.equals(this.kind, that.kind) &&
java.util.Objects.equals(this.id, that.id) &&
java.util.Objects.equals(this.selfLink, that.selfLink) &&
java.util.Objects.equals(this.etag, that.etag)
case _ => false
}

override def hashCode(): Int =
java.util.Objects.hash(name, location, kind, id, selfLink, etag)
}
object Bucket {

/** Scala API */
def apply(
name: String,
location: String,
kind: String,
id: String,
selfLink: String,
etag: String
): Bucket = new Bucket(
name,
location,
kind,
id,
selfLink,
etag
)

/** Java API */
def create(
name: String,
location: String,
kind: String,
id: String,
selfLink: String,
etag: String
): Bucket = new Bucket(
name,
location,
kind,
id,
selfLink,
etag
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (C) 2016-2019 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.googlecloud.storage

import scala.collection.immutable.Seq
import scala.collection.JavaConverters._

final class FailedUpload private (
val reasons: Seq[Throwable]
) extends Exception(reasons.map(_.getMessage).mkString(", ")) {

/** Java API */
def getReasons: java.util.List[Throwable] = reasons.asJava
}

object FailedUpload {

def apply(reasons: Seq[Throwable]) = new FailedUpload(reasons)

/** Java API */
def create(reasons: java.util.List[Throwable]) = FailedUpload(reasons.asScala.toList)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (C) 2016-2019 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.googlecloud.storage

import akka.stream.Attributes
import akka.stream.Attributes.Attribute

/**
* Akka Stream attributes that are used when materializing GCStorage stream blueprints.
*/
object GCStorageAttributes {

/**
* Settings to use for the GCStorage stream
*/
def settings(settings: GCStorageSettings): Attributes = Attributes(GCStorageSettingsValue(settings))

/**
* Config path which will be used to resolve required GCStorage settings
*/
def settingsPath(path: String): Attributes = Attributes(GCStorageSettingsPath(path))
}

final class GCStorageSettingsPath private (val path: String) extends Attribute
object GCStorageSettingsPath {
val Default = GCStorageSettingsPath(GCStorageSettings.ConfigPath)

def apply(path: String) = new GCStorageSettingsPath(path)
}

final class GCStorageSettingsValue private (val settings: GCStorageSettings) extends Attribute
object GCStorageSettingsValue {
def apply(settings: GCStorageSettings) = new GCStorageSettingsValue(settings)
}
Loading

0 comments on commit be3b89a

Please sign in to comment.