Skip to content

Commit

Permalink
createBucket request, akka#938
Browse files Browse the repository at this point in the history
    * populate createBucket request
    * test cases createBucket
  • Loading branch information
sfali committed Jun 6, 2018
1 parent 5d93fb4 commit 49c9dc2
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 2 deletions.
21 changes: 21 additions & 0 deletions s3/src/main/scala/akka/stream/alpakka/s3/impl/HttpRequests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,27 @@ private[alpakka] object HttpRequests {
.withUri(requestUri(bucket, None).withQuery(query))
}

def createBucket(bucket: String, s3Headers: S3Headers = S3Headers.empty)(implicit ec: ExecutionContext,
conf: S3Settings): Future[HttpRequest] = {
val region = conf.s3RegionProvider.getRegion
val headers = s3Headers.headers :+ Host(requestAuthority(bucket, region))
val httpRequest = HttpRequest(HttpMethods.PUT)
.withHeaders(headers: _*)
.withUri(requestUri(bucket, None))

Option(region) match {
case Some(r) if r != "us-east-1" =>
// region other than us-east-1
// @formatter:off
val payload = <CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/"><LocationConstraint>{r}</LocationConstraint></CreateBucketConfiguration>
// @formatter:on
for {
entity <- Marshal(payload).to[RequestEntity]
} yield httpRequest.withEntity(entity)
case _ => Future.successful(httpRequest)
}
}

def getDownloadRequest(s3Location: S3Location,
method: HttpMethod = HttpMethods.GET,
s3Headers: S3Headers = S3Headers.empty,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,19 @@ import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.Uri.Query
import akka.http.scaladsl.model.headers.{ByteRange, RawHeader}
import akka.http.scaladsl.model.{HttpEntity, HttpRequest, IllegalUriException, MediaTypes}
import akka.http.scaladsl.model._
import akka.stream.ActorMaterializer
import akka.stream.alpakka.s3.acl.CannedAcl
import akka.stream.alpakka.s3.{BufferType, MemoryBufferType, Proxy, S3Settings}
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.{Sink, Source}
import akka.testkit.{SocketUtil, TestProbe}
import com.amazonaws.auth.{AWSCredentialsProvider, AWSStaticCredentialsProvider, AnonymousAWSCredentials}
import com.amazonaws.regions.AwsRegionProvider
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{FlatSpec, Matchers}

import scala.collection.immutable

class HttpRequestsSpec extends FlatSpec with Matchers with ScalaFutures {

// test fixtures
Expand Down Expand Up @@ -384,4 +386,71 @@ class HttpRequestsSpec extends FlatSpec with Matchers with ScalaFutures {
request.headers should contain(RawHeader("x-amz-copy-source", "/source-bucket/some/source-key?versionId=abcdwxyz"))
request.headers should contain(RawHeader("x-amz-copy-source-range", "bytes=0-5242879"))
}

it should "create bucket request in default region" in {
implicit val settings: S3Settings = getSettings()

implicit val system: ActorSystem = ActorSystem("HttpRequestsSpec")
import system.dispatcher
implicit val mat: ActorMaterializer = ActorMaterializer()

val eventualHttpRequest = HttpRequests.createBucket("sample")
whenReady(eventualHttpRequest) { httpRequest =>
val maybeHostHeader = getHeader("host", httpRequest.headers)
maybeHostHeader.fold(fail("Unable to get host header")) { hostHeader =>
hostHeader.value() shouldEqual "sample.s3.amazonaws.com"
}
val eventualEntity = httpRequest.entity.dataBytes.map(_.utf8String).runWith(Sink.head)
whenReady(eventualEntity) { entity =>
entity shouldBe empty
mat.shutdown()
system.terminate()
}
}
}

it should "create bucket request in a region other than default region" in {
implicit val settings: S3Settings = getSettings(s3Region = "EU")

implicit val system: ActorSystem = ActorSystem("HttpRequestsSpec")
import system.dispatcher
implicit val mat: ActorMaterializer = ActorMaterializer()

val eventualHttpRequest = HttpRequests.createBucket("sample")
whenReady(eventualHttpRequest) { httpRequest =>
val maybeHostHeader = getHeader("host", httpRequest.headers)
maybeHostHeader.fold(fail("Unable to get host header")) { hostHeader =>
hostHeader.value() shouldEqual "sample.s3-eu.amazonaws.com"
}
val eventualEntity = httpRequest.entity.dataBytes.map(_.utf8String).runWith(Sink.head)
whenReady(eventualEntity) { entity =>
entity shouldEqual """<CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/"><LocationConstraint>EU</LocationConstraint></CreateBucketConfiguration>"""
mat.shutdown()
system.terminate()
}
}
}

it should "create bucket request with custom header" in {
implicit val settings: S3Settings = getSettings()

implicit val system: ActorSystem = ActorSystem("HttpRequestsSpec")
import system.dispatcher
implicit val mat: ActorMaterializer = ActorMaterializer()

val eventualHttpRequest = HttpRequests.createBucket("sample", S3Headers(acl))
whenReady(eventualHttpRequest) { httpRequest =>
val headers = httpRequest.headers
headers should have length 2
val maybeAclHeader = getHeader("x-amz-acl", headers)
maybeAclHeader.fold(fail("unable to get x-amz-acl")) { aclHeader =>
aclHeader.value() shouldEqual acl.value
mat.shutdown()
system.terminate()
}
}
}

private def getHeader(lowercaseName: String, headers: immutable.Seq[HttpHeader]): Option[HttpHeader] =
headers.find(_.lowercaseName() == lowercaseName)
}

0 comments on commit 49c9dc2

Please sign in to comment.