From 9a8e40babd125cfb1575ae71f90c2bb416956580 Mon Sep 17 00:00:00 2001 From: Muhammet Orazov Date: Wed, 5 Dec 2018 15:40:49 +0100 Subject: [PATCH 1/5] Remove `v` from jar when deploying to Github releases --- .travis.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index b8e8872e..ee67849e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -42,6 +42,8 @@ before_deploy: - echo "Ensure assembly jar file is created for a $TRAVIS_TAG" - ./sbtx assembly - ls -lh ./target/scala-2.12/ + - export JAR_VERSION=${TRAVIS_TAG#v} + - echo "Jar artifact version $JAR_VERSION" deploy: provider: releases @@ -55,7 +57,7 @@ deploy: hF48soVocY1xus5AkKiMxrW6d93Th2XTGoyRzJbwm4iXPY1UIKndlkEjFq3RsZRIPND9iURmp/qcwvlIdB29SsczYbH p3QOQn/NTC6SZbmgAW4xZpCRUUZwfOXP4RacIcDKHlsUjqBZwmSmxK/vJ6KRNR4yxBn7cVlm060cD5l3TmpuUC6X9JI EPAkYJyNJ1CtRUkYDbgBn+Eof5X3jOZqo8pI51YBKdnz0E= - file: ./target/scala-2.12/cloud-storage-etl-udfs-${TRAVIS_TAG}.jar + file: ./target/scala-2.12/cloud-storage-etl-udfs-${JAR_VERSION}.jar skip_cleanup: true on: repo: exasol/cloud-storage-etl-udfs From 3decdd1ceba5ead1a2439aabd3f9001ccb1ab93f Mon Sep 17 00:00:00 2001 From: Muhammet Orazov Date: Thu, 6 Dec 2018 15:36:35 +0100 Subject: [PATCH 2/5] Add initial Azure Blob Storage Import It is possible to import (parquet) data from Azure Blob Store using `wasb` or `wasbs` protocol. The Azure storage account name and account access key should be provided. --- README.md | 31 +++++++++----- project/Dependencies.scala | 2 + .../com/exasol/cloudetl/bucket/Bucket.scala | 40 +++++++++++++++++-- 3 files changed, 60 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index 81245759..5e9fd305 100644 --- a/README.md +++ b/README.md @@ -100,11 +100,11 @@ CREATE TABLE SALES_POSITIONS ( IMPORT INTO SALES_POSITIONS FROM SCRIPT ETL.IMPORT_PATH WITH - BUCKET_PATH = 's3a://exa-mo-frankfurt/test/retail/sales_positions/*' - S3_ACCESS_KEY = 'MY_AWS_ACCESS_KEY' - S3_SECRET_KEY = 'MY_AWS_SECRET_KEY' - S3_ENDPOINT = 's3.MY_REGION.amazonaws.com' - PARALLELISM = 'nproc()*10'; + BUCKET_PATH = 's3a://exa-mo-frankfurt/test/retail/sales_positions/*' + S3_ACCESS_KEY = 'MY_AWS_ACCESS_KEY' + S3_SECRET_KEY = 'MY_AWS_SECRET_KEY' + S3_ENDPOINT = 's3.MY_REGION.amazonaws.com' + PARALLELISM = 'nproc()*10'; -- MY_REGION is one of AWS regions, for example, eu-central-1 @@ -130,10 +130,23 @@ And then run import, ```sql IMPORT INTO SALES_POSITIONS FROM SCRIPT ETL.IMPORT_PATH WITH - BUCKET_PATH = 'gs://exa-test-bucket/data/parquet/sales_positions/*' - GCS_PROJECT_ID = 'MY_GCS_PORJECT_ID' - GCS_KEYFILE_PATH = 'MY_BUCKETFS_PATH/project-id-service-keyfile.json' - PARALLELISM = 'nproc()*10'; + BUCKET_PATH = 'gs://exa-test-bucket/data/parquet/sales_positions/*' + GCS_PROJECT_ID = 'MY_GCS_PORJECT_ID' + GCS_KEYFILE_PATH = 'MY_BUCKETFS_PATH/project-id-service-keyfile.json' + PARALLELISM = 'nproc()*10'; + +SELECT * FROM SALES_POSITIONS LIMIT 10; +``` + +#### Import from Azure Blob Store + +```sql +IMPORT INTO SALES_POSITIONS +FROM SCRIPT ETL.IMPORT_PATH WITH + BUCKET_PATH = 'wasbs://@.blob.core.windows.net/sales-positions/*' + AZURE_ACCOUNT_NAME = 'MY_AZURE_STORAGE_ACCOUNT_NAME' + AZURE_SECRET_KEY = 'MY_AZURE_STORAGE_SECRET_KEY' + PARALLELISM = 'nproc()*10'; SELECT * FROM SALES_POSITIONS LIMIT 10; ``` diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 5e83190c..ba07733e 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -15,9 +15,11 @@ object Dependencies { "com.exasol" % "exasol-jdbc" % "6.0.8", "com.exasol" % "exasol-script-api" % "6.0.8", "org.apache.hadoop" % "hadoop-aws" % "2.8.4", + "org.apache.hadoop" % "hadoop-azure" % "2.8.4", "org.apache.hadoop" % "hadoop-common" % "2.8.4" exclude ("org.slf4j", "slf4j-log4j12"), "org.apache.hadoop" % "hadoop-hdfs" % "2.8.4", "org.apache.parquet" % "parquet-avro" % "1.8.1", + "com.microsoft.azure" % "azure-storage" % "2.2.0", "com.google.cloud.bigdataoss" % "gcs-connector" % "hadoop2-1.9.10" ) diff --git a/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala b/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala index 6067013b..7f5348a6 100644 --- a/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala +++ b/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala @@ -70,6 +70,34 @@ final case class GCSBucket(path: String, params: Map[String, String]) extends Bu } +final case class AzureBlobBucket(path: String, params: Map[String, String]) extends Bucket { + + override val bucketPath: String = path + + override def validate(): Unit = + Bucket.validate(params, Bucket.AZURE_PARAMETERS) + + override def createConfiguration(): Configuration = { + validate() + + val conf = new Configuration() + val accountName = Bucket.requiredParam(params, "AZURE_ACCOUNT_NAME") + val accountSecretKey = Bucket.requiredParam(params, "AZURE_SECRET_KEY") + conf.set("fs.azure", classOf[org.apache.hadoop.fs.azure.NativeAzureFileSystem].getName) + conf.set("fs.wasb.impl", classOf[org.apache.hadoop.fs.azure.NativeAzureFileSystem].getName) + conf.set("fs.wasbs.impl", classOf[org.apache.hadoop.fs.azure.NativeAzureFileSystem].getName) + conf.set("fs.AbstractFileSystem.wasb.impl", classOf[org.apache.hadoop.fs.azure.Wasb].getName) + conf.set( + "fs.AbstractFileSystem.wasbs.impl", + classOf[org.apache.hadoop.fs.azure.Wasbs].getName + ) + conf.set(s"fs.azure.account.key.$accountName.blob.core.windows.net", accountSecretKey) + + conf + } + +} + final case class LocalBucket(path: String, params: Map[String, String]) extends Bucket { override val bucketPath: String = path @@ -88,10 +116,11 @@ object Bucket extends LazyLogging { val scheme = getScheme(path) scheme match { - case "s3a" => S3Bucket(path, params) - case "gs" => GCSBucket(path, params) - case "file" => LocalBucket(path, params) - case _ => throw new IllegalArgumentException(s"Unknown path scheme $scheme") + case "s3a" => S3Bucket(path, params) + case "gs" => GCSBucket(path, params) + case "wasb" | "wasbs" => AzureBlobBucket(path, params) + case "file" => LocalBucket(path, params) + case _ => throw new IllegalArgumentException(s"Unknown path scheme $scheme") } } @@ -135,4 +164,7 @@ object Bucket extends LazyLogging { final val GCS_PARAMETERS: Seq[String] = Seq("GCS_PROJECT_ID", "GCS_KEYFILE_PATH") + final val AZURE_PARAMETERS: Seq[String] = + Seq("AZURE_ACCOUNT_NAME", "AZURE_SECRET_KEY") + } From f4c800f28575a3e1b48dd4ce6632d0bf6a0a5215 Mon Sep 17 00:00:00 2001 From: Muhammet Orazov Date: Thu, 6 Dec 2018 15:38:16 +0100 Subject: [PATCH 3/5] Change key value separator for serialization / deserialization Currently it is `=`. This is problem for instance if the secret / access keys contains `==` (think Base64). Change it into `:=:`. --- README.md | 2 +- src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 5e9fd305..da6cad9b 100644 --- a/README.md +++ b/README.md @@ -143,7 +143,7 @@ SELECT * FROM SALES_POSITIONS LIMIT 10; ```sql IMPORT INTO SALES_POSITIONS FROM SCRIPT ETL.IMPORT_PATH WITH - BUCKET_PATH = 'wasbs://@.blob.core.windows.net/sales-positions/*' + BUCKET_PATH = 'wasbs://@.blob.core.windows.net/sales-positions/*' AZURE_ACCOUNT_NAME = 'MY_AZURE_STORAGE_ACCOUNT_NAME' AZURE_SECRET_KEY = 'MY_AZURE_STORAGE_SECRET_KEY' PARALLELISM = 'nproc()*10'; diff --git a/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala b/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala index 7f5348a6..1e280636 100644 --- a/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala +++ b/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala @@ -144,14 +144,14 @@ object Bucket extends LazyLogging { def mapToStr(params: Map[String, String]): String = { val selectedParams = (params -- Seq("PARALLELISM")) - selectedParams.map { case (k, v) => s"$k=$v" }.mkString(";") + selectedParams.map { case (k, v) => s"$k:=:$v" }.mkString(";") } def strToMap(str: String): Map[String, String] = str .split(";") .map { word => - val kv = word.split("=") + val kv = word.split(":=:") kv(0) -> kv(1) } .toMap From 11d22972af5f6e0e12e7508710d92e44a27fa2d3 Mon Sep 17 00:00:00 2001 From: Muhammet Orazov Date: Thu, 6 Dec 2018 15:50:56 +0100 Subject: [PATCH 4/5] Fix tests after changin separator --- .../com/exasol/cloudetl/scriptclasses/ImportPathSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportPathSuite.scala b/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportPathSuite.scala index 0a8fe23e..9a58e602 100644 --- a/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportPathSuite.scala +++ b/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportPathSuite.scala @@ -17,8 +17,8 @@ class ImportPathSuite extends BaseImportSuite { when(exaSpec.getParameters()).thenReturn(params.asJava) val rest = - s"""BUCKET_PATH=$s3BucketPath;S3_ENDPOINT=$s3Endpoint;""" + - s"""S3_ACCESS_KEY=$s3AccessKey;S3_SECRET_KEY=$s3SecretKey""" + s"""BUCKET_PATH:=:$s3BucketPath;S3_ENDPOINT:=:$s3Endpoint;""" + + s"""S3_ACCESS_KEY:=:$s3AccessKey;S3_SECRET_KEY:=:$s3SecretKey""" val sqlExpected = s""" From 96bc6cc786b1d9f5f002ddca78c47e6d4f0bec00 Mon Sep 17 00:00:00 2001 From: Muhammet Orazov Date: Thu, 6 Dec 2018 17:31:09 +0100 Subject: [PATCH 5/5] Add tests for bucket classes Make codecov happy. --- .../com/exasol/cloudetl/bucket/Bucket.scala | 7 +- .../exasol/cloudetl/bucket/BucketSuite.scala | 86 +++++++++++++++++++ 2 files changed, 91 insertions(+), 2 deletions(-) create mode 100644 src/test/scala/com/exasol/cloudetl/bucket/BucketSuite.scala diff --git a/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala b/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala index 1e280636..68cc77f2 100644 --- a/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala +++ b/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala @@ -104,8 +104,10 @@ final case class LocalBucket(path: String, params: Map[String, String]) extends override def validate(): Unit = () - override def createConfiguration(): Configuration = + override def createConfiguration(): Configuration = { + validate() new Configuration() + } } @@ -120,7 +122,8 @@ object Bucket extends LazyLogging { case "gs" => GCSBucket(path, params) case "wasb" | "wasbs" => AzureBlobBucket(path, params) case "file" => LocalBucket(path, params) - case _ => throw new IllegalArgumentException(s"Unknown path scheme $scheme") + case _ => + throw new IllegalArgumentException(s"Unsupported path scheme $scheme") } } diff --git a/src/test/scala/com/exasol/cloudetl/bucket/BucketSuite.scala b/src/test/scala/com/exasol/cloudetl/bucket/BucketSuite.scala new file mode 100644 index 00000000..d45dc70d --- /dev/null +++ b/src/test/scala/com/exasol/cloudetl/bucket/BucketSuite.scala @@ -0,0 +1,86 @@ +package com.exasol.cloudetl.bucket + +import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem +import org.apache.hadoop.fs.azure.NativeAzureFileSystem +import org.apache.hadoop.fs.azure.Wasb +import org.apache.hadoop.fs.azure.Wasbs +import org.apache.hadoop.fs.s3a.S3AFileSystem +import org.scalatest.FunSuite +import org.scalatest.Matchers + +@SuppressWarnings(Array("org.wartremover.warts.IsInstanceOf")) +class BucketSuite extends FunSuite with Matchers { + + test("throws an exception if the scheme is not supported") { + val thrown = intercept[IllegalArgumentException] { + Bucket(Map(Bucket.BUCKET_PATH -> "xyz:/bucket/files*")) + } + assert(thrown.getMessage === "Unsupported path scheme xyz") + } + + test("creates an LocalBucket with path parameter") { + val bucket = Bucket(Map(Bucket.BUCKET_PATH -> "file://local/path/bucket/")) + + assert(bucket.isInstanceOf[LocalBucket]) + } + + test("creates an S3Bucket with given parameters") { + val s3params = Map( + Bucket.BUCKET_PATH -> "s3a://my-bucket/", + "S3_ENDPOINT" -> "eu-central-1", + "S3_ACCESS_KEY" -> "abc", + "S3_SECRET_KEY" -> "xyz" + ) + + val bucket = Bucket(s3params) + val conf = bucket.createConfiguration() + + assert(bucket.isInstanceOf[S3Bucket]) + assert(conf.get("fs.s3a.impl") === classOf[S3AFileSystem].getName) + assert(conf.get("fs.s3a.endpoint") === "eu-central-1") + assert(conf.get("fs.s3a.access.key") === "abc") + assert(conf.get("fs.s3a.secret.key") === "xyz") + } + + test("creates a GCSBucket with given parameters") { + val gcsParams = Map( + Bucket.BUCKET_PATH -> "gs://my-bucket/", + "GCS_PROJECT_ID" -> "projX", + "GCS_KEYFILE_PATH" -> "/bucketfs/bucket1/projX.json" + ) + + val bucket = Bucket(gcsParams) + val conf = bucket.createConfiguration() + + assert(bucket.isInstanceOf[GCSBucket]) + assert(conf.get("fs.gs.impl") === classOf[GoogleHadoopFileSystem].getName) + assert(conf.get("fs.gs.project.id") === "projX") + assert(conf.get("fs.gs.auth.service.account.json.keyfile") === "/bucketfs/bucket1/projX.json") + } + + test("creates an AzureBlobBucket with given parameters") { + val azureBlobParams = Map( + Bucket.BUCKET_PATH -> "wasbs://container@account1/parquet-bucket/", + "AZURE_ACCOUNT_NAME" -> "account1", + "AZURE_SECRET_KEY" -> "secret" + ) + + val bucket = Bucket(azureBlobParams) + val conf = bucket.createConfiguration() + + assert(bucket.isInstanceOf[AzureBlobBucket]) + assert(conf.get("fs.azure.account.key.account1.blob.core.windows.net") === "secret") + val mappings = Map( + "fs.azure" -> classOf[NativeAzureFileSystem].getName, + "fs.wasb.impl" -> classOf[NativeAzureFileSystem].getName, + "fs.wasbs.impl" -> classOf[NativeAzureFileSystem].getName, + "fs.AbstractFileSystem.wasb.impl" -> classOf[Wasb].getName, + "fs.AbstractFileSystem.wasbs.impl" -> classOf[Wasbs].getName + ) + mappings.foreach { + case (given, expected) => + assert(conf.get(given) === expected) + } + } + +}