diff --git a/.travis.yml b/.travis.yml index b8e8872e..ee67849e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -42,6 +42,8 @@ before_deploy: - echo "Ensure assembly jar file is created for a $TRAVIS_TAG" - ./sbtx assembly - ls -lh ./target/scala-2.12/ + - export JAR_VERSION=${TRAVIS_TAG#v} + - echo "Jar artifact version $JAR_VERSION" deploy: provider: releases @@ -55,7 +57,7 @@ deploy: hF48soVocY1xus5AkKiMxrW6d93Th2XTGoyRzJbwm4iXPY1UIKndlkEjFq3RsZRIPND9iURmp/qcwvlIdB29SsczYbH p3QOQn/NTC6SZbmgAW4xZpCRUUZwfOXP4RacIcDKHlsUjqBZwmSmxK/vJ6KRNR4yxBn7cVlm060cD5l3TmpuUC6X9JI EPAkYJyNJ1CtRUkYDbgBn+Eof5X3jOZqo8pI51YBKdnz0E= - file: ./target/scala-2.12/cloud-storage-etl-udfs-${TRAVIS_TAG}.jar + file: ./target/scala-2.12/cloud-storage-etl-udfs-${JAR_VERSION}.jar skip_cleanup: true on: repo: exasol/cloud-storage-etl-udfs diff --git a/README.md b/README.md index 81245759..da6cad9b 100644 --- a/README.md +++ b/README.md @@ -100,11 +100,11 @@ CREATE TABLE SALES_POSITIONS ( IMPORT INTO SALES_POSITIONS FROM SCRIPT ETL.IMPORT_PATH WITH - BUCKET_PATH = 's3a://exa-mo-frankfurt/test/retail/sales_positions/*' - S3_ACCESS_KEY = 'MY_AWS_ACCESS_KEY' - S3_SECRET_KEY = 'MY_AWS_SECRET_KEY' - S3_ENDPOINT = 's3.MY_REGION.amazonaws.com' - PARALLELISM = 'nproc()*10'; + BUCKET_PATH = 's3a://exa-mo-frankfurt/test/retail/sales_positions/*' + S3_ACCESS_KEY = 'MY_AWS_ACCESS_KEY' + S3_SECRET_KEY = 'MY_AWS_SECRET_KEY' + S3_ENDPOINT = 's3.MY_REGION.amazonaws.com' + PARALLELISM = 'nproc()*10'; -- MY_REGION is one of AWS regions, for example, eu-central-1 @@ -130,10 +130,23 @@ And then run import, ```sql IMPORT INTO SALES_POSITIONS FROM SCRIPT ETL.IMPORT_PATH WITH - BUCKET_PATH = 'gs://exa-test-bucket/data/parquet/sales_positions/*' - GCS_PROJECT_ID = 'MY_GCS_PORJECT_ID' - GCS_KEYFILE_PATH = 'MY_BUCKETFS_PATH/project-id-service-keyfile.json' - PARALLELISM = 'nproc()*10'; + BUCKET_PATH = 'gs://exa-test-bucket/data/parquet/sales_positions/*' + GCS_PROJECT_ID = 'MY_GCS_PORJECT_ID' + GCS_KEYFILE_PATH = 'MY_BUCKETFS_PATH/project-id-service-keyfile.json' + PARALLELISM = 'nproc()*10'; + +SELECT * FROM SALES_POSITIONS LIMIT 10; +``` + +#### Import from Azure Blob Store + +```sql +IMPORT INTO SALES_POSITIONS +FROM SCRIPT ETL.IMPORT_PATH WITH + BUCKET_PATH = 'wasbs://@.blob.core.windows.net/sales-positions/*' + AZURE_ACCOUNT_NAME = 'MY_AZURE_STORAGE_ACCOUNT_NAME' + AZURE_SECRET_KEY = 'MY_AZURE_STORAGE_SECRET_KEY' + PARALLELISM = 'nproc()*10'; SELECT * FROM SALES_POSITIONS LIMIT 10; ``` diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 5e83190c..ba07733e 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -15,9 +15,11 @@ object Dependencies { "com.exasol" % "exasol-jdbc" % "6.0.8", "com.exasol" % "exasol-script-api" % "6.0.8", "org.apache.hadoop" % "hadoop-aws" % "2.8.4", + "org.apache.hadoop" % "hadoop-azure" % "2.8.4", "org.apache.hadoop" % "hadoop-common" % "2.8.4" exclude ("org.slf4j", "slf4j-log4j12"), "org.apache.hadoop" % "hadoop-hdfs" % "2.8.4", "org.apache.parquet" % "parquet-avro" % "1.8.1", + "com.microsoft.azure" % "azure-storage" % "2.2.0", "com.google.cloud.bigdataoss" % "gcs-connector" % "hadoop2-1.9.10" ) diff --git a/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala b/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala index 6067013b..68cc77f2 100644 --- a/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala +++ b/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala @@ -70,14 +70,44 @@ final case class GCSBucket(path: String, params: Map[String, String]) extends Bu } +final case class AzureBlobBucket(path: String, params: Map[String, String]) extends Bucket { + + override val bucketPath: String = path + + override def validate(): Unit = + Bucket.validate(params, Bucket.AZURE_PARAMETERS) + + override def createConfiguration(): Configuration = { + validate() + + val conf = new Configuration() + val accountName = Bucket.requiredParam(params, "AZURE_ACCOUNT_NAME") + val accountSecretKey = Bucket.requiredParam(params, "AZURE_SECRET_KEY") + conf.set("fs.azure", classOf[org.apache.hadoop.fs.azure.NativeAzureFileSystem].getName) + conf.set("fs.wasb.impl", classOf[org.apache.hadoop.fs.azure.NativeAzureFileSystem].getName) + conf.set("fs.wasbs.impl", classOf[org.apache.hadoop.fs.azure.NativeAzureFileSystem].getName) + conf.set("fs.AbstractFileSystem.wasb.impl", classOf[org.apache.hadoop.fs.azure.Wasb].getName) + conf.set( + "fs.AbstractFileSystem.wasbs.impl", + classOf[org.apache.hadoop.fs.azure.Wasbs].getName + ) + conf.set(s"fs.azure.account.key.$accountName.blob.core.windows.net", accountSecretKey) + + conf + } + +} + final case class LocalBucket(path: String, params: Map[String, String]) extends Bucket { override val bucketPath: String = path override def validate(): Unit = () - override def createConfiguration(): Configuration = + override def createConfiguration(): Configuration = { + validate() new Configuration() + } } @@ -88,10 +118,12 @@ object Bucket extends LazyLogging { val scheme = getScheme(path) scheme match { - case "s3a" => S3Bucket(path, params) - case "gs" => GCSBucket(path, params) - case "file" => LocalBucket(path, params) - case _ => throw new IllegalArgumentException(s"Unknown path scheme $scheme") + case "s3a" => S3Bucket(path, params) + case "gs" => GCSBucket(path, params) + case "wasb" | "wasbs" => AzureBlobBucket(path, params) + case "file" => LocalBucket(path, params) + case _ => + throw new IllegalArgumentException(s"Unsupported path scheme $scheme") } } @@ -115,14 +147,14 @@ object Bucket extends LazyLogging { def mapToStr(params: Map[String, String]): String = { val selectedParams = (params -- Seq("PARALLELISM")) - selectedParams.map { case (k, v) => s"$k=$v" }.mkString(";") + selectedParams.map { case (k, v) => s"$k:=:$v" }.mkString(";") } def strToMap(str: String): Map[String, String] = str .split(";") .map { word => - val kv = word.split("=") + val kv = word.split(":=:") kv(0) -> kv(1) } .toMap @@ -135,4 +167,7 @@ object Bucket extends LazyLogging { final val GCS_PARAMETERS: Seq[String] = Seq("GCS_PROJECT_ID", "GCS_KEYFILE_PATH") + final val AZURE_PARAMETERS: Seq[String] = + Seq("AZURE_ACCOUNT_NAME", "AZURE_SECRET_KEY") + } diff --git a/src/test/scala/com/exasol/cloudetl/bucket/BucketSuite.scala b/src/test/scala/com/exasol/cloudetl/bucket/BucketSuite.scala new file mode 100644 index 00000000..d45dc70d --- /dev/null +++ b/src/test/scala/com/exasol/cloudetl/bucket/BucketSuite.scala @@ -0,0 +1,86 @@ +package com.exasol.cloudetl.bucket + +import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem +import org.apache.hadoop.fs.azure.NativeAzureFileSystem +import org.apache.hadoop.fs.azure.Wasb +import org.apache.hadoop.fs.azure.Wasbs +import org.apache.hadoop.fs.s3a.S3AFileSystem +import org.scalatest.FunSuite +import org.scalatest.Matchers + +@SuppressWarnings(Array("org.wartremover.warts.IsInstanceOf")) +class BucketSuite extends FunSuite with Matchers { + + test("throws an exception if the scheme is not supported") { + val thrown = intercept[IllegalArgumentException] { + Bucket(Map(Bucket.BUCKET_PATH -> "xyz:/bucket/files*")) + } + assert(thrown.getMessage === "Unsupported path scheme xyz") + } + + test("creates an LocalBucket with path parameter") { + val bucket = Bucket(Map(Bucket.BUCKET_PATH -> "file://local/path/bucket/")) + + assert(bucket.isInstanceOf[LocalBucket]) + } + + test("creates an S3Bucket with given parameters") { + val s3params = Map( + Bucket.BUCKET_PATH -> "s3a://my-bucket/", + "S3_ENDPOINT" -> "eu-central-1", + "S3_ACCESS_KEY" -> "abc", + "S3_SECRET_KEY" -> "xyz" + ) + + val bucket = Bucket(s3params) + val conf = bucket.createConfiguration() + + assert(bucket.isInstanceOf[S3Bucket]) + assert(conf.get("fs.s3a.impl") === classOf[S3AFileSystem].getName) + assert(conf.get("fs.s3a.endpoint") === "eu-central-1") + assert(conf.get("fs.s3a.access.key") === "abc") + assert(conf.get("fs.s3a.secret.key") === "xyz") + } + + test("creates a GCSBucket with given parameters") { + val gcsParams = Map( + Bucket.BUCKET_PATH -> "gs://my-bucket/", + "GCS_PROJECT_ID" -> "projX", + "GCS_KEYFILE_PATH" -> "/bucketfs/bucket1/projX.json" + ) + + val bucket = Bucket(gcsParams) + val conf = bucket.createConfiguration() + + assert(bucket.isInstanceOf[GCSBucket]) + assert(conf.get("fs.gs.impl") === classOf[GoogleHadoopFileSystem].getName) + assert(conf.get("fs.gs.project.id") === "projX") + assert(conf.get("fs.gs.auth.service.account.json.keyfile") === "/bucketfs/bucket1/projX.json") + } + + test("creates an AzureBlobBucket with given parameters") { + val azureBlobParams = Map( + Bucket.BUCKET_PATH -> "wasbs://container@account1/parquet-bucket/", + "AZURE_ACCOUNT_NAME" -> "account1", + "AZURE_SECRET_KEY" -> "secret" + ) + + val bucket = Bucket(azureBlobParams) + val conf = bucket.createConfiguration() + + assert(bucket.isInstanceOf[AzureBlobBucket]) + assert(conf.get("fs.azure.account.key.account1.blob.core.windows.net") === "secret") + val mappings = Map( + "fs.azure" -> classOf[NativeAzureFileSystem].getName, + "fs.wasb.impl" -> classOf[NativeAzureFileSystem].getName, + "fs.wasbs.impl" -> classOf[NativeAzureFileSystem].getName, + "fs.AbstractFileSystem.wasb.impl" -> classOf[Wasb].getName, + "fs.AbstractFileSystem.wasbs.impl" -> classOf[Wasbs].getName + ) + mappings.foreach { + case (given, expected) => + assert(conf.get(given) === expected) + } + } + +} diff --git a/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportPathSuite.scala b/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportPathSuite.scala index 0a8fe23e..9a58e602 100644 --- a/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportPathSuite.scala +++ b/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportPathSuite.scala @@ -17,8 +17,8 @@ class ImportPathSuite extends BaseImportSuite { when(exaSpec.getParameters()).thenReturn(params.asJava) val rest = - s"""BUCKET_PATH=$s3BucketPath;S3_ENDPOINT=$s3Endpoint;""" + - s"""S3_ACCESS_KEY=$s3AccessKey;S3_SECRET_KEY=$s3SecretKey""" + s"""BUCKET_PATH:=:$s3BucketPath;S3_ENDPOINT:=:$s3Endpoint;""" + + s"""S3_ACCESS_KEY:=:$s3AccessKey;S3_SECRET_KEY:=:$s3SecretKey""" val sqlExpected = s"""