From b8114801d0cbef491d6674432d229273e5d732ca Mon Sep 17 00:00:00 2001 From: Muhammet Orazov Date: Wed, 13 Nov 2019 13:46:25 +0100 Subject: [PATCH 1/4] Add Exasol named connection object support for AWS S3 access. --- CHANGES.md | 2 +- .../cloudetl/bucket/AzureAdlsBucket.scala | 4 + .../cloudetl/bucket/AzureBlobBucket.scala | 4 + .../com/exasol/cloudetl/bucket/Bucket.scala | 34 +++----- .../exasol/cloudetl/bucket/GCSBucket.scala | 4 + .../exasol/cloudetl/bucket/LocalBucket.scala | 4 + .../com/exasol/cloudetl/bucket/S3Bucket.scala | 23 +++-- .../exasol/cloudetl/bucket/SecureBucket.scala | 79 ++++++++++++++++++ .../cloudetl/scriptclasses/ExportTable.scala | 2 +- .../cloudetl/scriptclasses/ImportFiles.scala | 2 +- .../scriptclasses/ImportMetadata.scala | 2 +- .../cloudetl/scriptclasses/ImportPath.scala | 8 +- .../StorageConnectionInformation.scala | 12 +++ .../cloudetl/storage/StorageProperties.scala | 43 +++++++++- .../cloudetl/bucket/AbstractBucketTest.scala | 5 ++ .../cloudetl/bucket/AzureBlobBucketTest.scala | 12 +-- .../exasol/cloudetl/bucket/BucketTest.scala | 10 +-- .../cloudetl/bucket/SecureBucketTest.scala | 83 +++++++++++++++++++ .../scriptclasses/ExportPathTest.scala | 4 +- .../scriptclasses/ImportPathTest.scala | 4 +- .../cloudetl/sink/BatchSizedSinkTest.scala | 4 +- .../StorageConnectionInformationTest.scala | 17 ++++ .../storage/StoragePropertiesTest.scala | 58 ++++++++++++- 23 files changed, 363 insertions(+), 57 deletions(-) create mode 100644 src/main/scala/com/exasol/cloudetl/bucket/SecureBucket.scala create mode 100644 src/main/scala/com/exasol/cloudetl/storage/StorageConnectionInformation.scala create mode 100644 src/test/scala/com/exasol/cloudetl/bucket/SecureBucketTest.scala create mode 100644 src/test/scala/com/exasol/cloudetl/storage/StorageConnectionInformationTest.scala diff --git a/CHANGES.md b/CHANGES.md index 43e719ce..93330d75 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,7 +2,7 @@ > 2019 OCT 31 :jack_o_lantern: -* Adds Apach Kafka consumer UDF to import Avro formatted data from Kafka +* Adds Apache Kafka consumer UDF to import Avro formatted data from Kafka clusters. [#40](https://github.com/exasol/cloud-storage-etl-udfs/issues/40) [#39](https://github.com/exasol/cloud-storage-etl-udfs/pull/39) [#48](https://github.com/exasol/cloud-storage-etl-udfs/pull/48) diff --git a/src/main/scala/com/exasol/cloudetl/bucket/AzureAdlsBucket.scala b/src/main/scala/com/exasol/cloudetl/bucket/AzureAdlsBucket.scala index 7dc8ccb9..38171e54 100644 --- a/src/main/scala/com/exasol/cloudetl/bucket/AzureAdlsBucket.scala +++ b/src/main/scala/com/exasol/cloudetl/bucket/AzureAdlsBucket.scala @@ -17,6 +17,10 @@ final case class AzureAdlsBucket(path: String, params: StorageProperties) extend /** @inheritdoc */ override val properties: StorageProperties = params + /** @inheritdoc */ + override def validate(): Unit = + validateRequiredProperties() + /** * Returns the list of required property keys for Azure Data Lake * Storage. diff --git a/src/main/scala/com/exasol/cloudetl/bucket/AzureBlobBucket.scala b/src/main/scala/com/exasol/cloudetl/bucket/AzureBlobBucket.scala index 098254d7..3d8518ef 100644 --- a/src/main/scala/com/exasol/cloudetl/bucket/AzureBlobBucket.scala +++ b/src/main/scala/com/exasol/cloudetl/bucket/AzureBlobBucket.scala @@ -18,6 +18,10 @@ final case class AzureBlobBucket(path: String, params: StorageProperties) extend /** @inheritdoc */ override val properties: StorageProperties = params + /** @inheritdoc */ + override def validate(): Unit = + validateRequiredProperties() + /** * Returns the list of required property keys for Azure Blob Storage. */ diff --git a/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala b/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala index 2d311458..4f2ff4a3 100644 --- a/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala +++ b/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala @@ -20,7 +20,7 @@ import org.apache.hadoop.fs.Path * * All specific implementation of a bucket should extend this class. */ -abstract class Bucket { +abstract class Bucket extends LazyLogging { /** The path string of the bucket. */ val bucketPath: String @@ -34,11 +34,19 @@ abstract class Bucket { */ def getRequiredProperties(): Seq[String] - /** Validates that all required parameter key values are available. */ - final def validate(): Unit = - validateRequiredProperties() + /** + * Creates a Hadoop [[org.apache.hadoop.conf.Configuration]] for this + * specific bucket type. + */ + def getConfiguration(): Configuration - private[this] def validateRequiredProperties(): Unit = + /** + * Validates that user provided key-value properties are available for + * this bucket implementation. + */ + def validate(): Unit + + protected[this] final def validateRequiredProperties(): Unit = getRequiredProperties().foreach { key => if (!properties.containsKey(key)) { throw new IllegalArgumentException( @@ -47,12 +55,6 @@ abstract class Bucket { } } - /** - * Creates a Hadoop [[org.apache.hadoop.conf.Configuration]] for this - * specific bucket type. - */ - def getConfiguration(): Configuration - /** * The Hadoop [[org.apache.hadoop.fs.FileSystem]] for this specific * bucket path. @@ -101,14 +103,4 @@ object Bucket extends LazyLogging { } } - /** - * Creates specific [[Bucket]] class using the path scheme from - * key-value properties. - * - * @param params The key value parameters - * @return A [[Bucket]] class for the given path - */ - def apply(params: Map[String, String]): Bucket = - apply(StorageProperties(params)) - } diff --git a/src/main/scala/com/exasol/cloudetl/bucket/GCSBucket.scala b/src/main/scala/com/exasol/cloudetl/bucket/GCSBucket.scala index d85d107e..649ffdc7 100644 --- a/src/main/scala/com/exasol/cloudetl/bucket/GCSBucket.scala +++ b/src/main/scala/com/exasol/cloudetl/bucket/GCSBucket.scala @@ -16,6 +16,10 @@ final case class GCSBucket(path: String, params: StorageProperties) extends Buck /** @inheritdoc */ override val properties: StorageProperties = params + /** @inheritdoc */ + override def validate(): Unit = + validateRequiredProperties() + /** * Returns the list of required property keys for Google Cloud * Storage. diff --git a/src/main/scala/com/exasol/cloudetl/bucket/LocalBucket.scala b/src/main/scala/com/exasol/cloudetl/bucket/LocalBucket.scala index 5f9e6b5f..541d2524 100644 --- a/src/main/scala/com/exasol/cloudetl/bucket/LocalBucket.scala +++ b/src/main/scala/com/exasol/cloudetl/bucket/LocalBucket.scala @@ -18,6 +18,10 @@ final case class LocalBucket(path: String, params: StorageProperties) extends Bu /** @inheritdoc */ override def getRequiredProperties(): Seq[String] = Seq.empty[String] + /** @inheritdoc */ + override def validate(): Unit = + validateRequiredProperties() + /** @inheritdoc */ override def getConfiguration(): Configuration = { validate() diff --git a/src/main/scala/com/exasol/cloudetl/bucket/S3Bucket.scala b/src/main/scala/com/exasol/cloudetl/bucket/S3Bucket.scala index 0e8c8185..2bba39c0 100644 --- a/src/main/scala/com/exasol/cloudetl/bucket/S3Bucket.scala +++ b/src/main/scala/com/exasol/cloudetl/bucket/S3Bucket.scala @@ -5,7 +5,9 @@ import com.exasol.cloudetl.storage.StorageProperties import org.apache.hadoop.conf.Configuration /** A [[Bucket]] implementation for the AWS S3 */ -final case class S3Bucket(path: String, params: StorageProperties) extends Bucket { +final case class S3Bucket(path: String, params: StorageProperties) + extends Bucket + with SecureBucket { private[this] val S3_ENDPOINT: String = "S3_ENDPOINT" private[this] val S3_ACCESS_KEY: String = "S3_ACCESS_KEY" @@ -17,9 +19,19 @@ final case class S3Bucket(path: String, params: StorageProperties) extends Bucke /** @inheritdoc */ override val properties: StorageProperties = params + /** @inheritdoc */ + override val accountName: String = S3_ACCESS_KEY + + /** @inheritdoc */ + override val accountSecret: String = S3_SECRET_KEY + /** Returns the list of required property keys for AWS S3 Storage. */ - override def getRequiredProperties(): Seq[String] = - Seq(S3_ENDPOINT, S3_ACCESS_KEY, S3_SECRET_KEY) + override def getRequiredProperties(): Seq[String] = Seq(S3_ENDPOINT) + + override def validate(): Unit = { + validateRequiredProperties() + validateConnectionProperties() + } /** * @inheritdoc @@ -34,8 +46,9 @@ final case class S3Bucket(path: String, params: StorageProperties) extends Bucke conf.set("fs.file.impl", classOf[org.apache.hadoop.fs.LocalFileSystem].getName) conf.set("fs.s3a.impl", classOf[org.apache.hadoop.fs.s3a.S3AFileSystem].getName) conf.set("fs.s3a.endpoint", properties.getString(S3_ENDPOINT)) - conf.set("fs.s3a.access.key", properties.getString(S3_ACCESS_KEY)) - conf.set("fs.s3a.secret.key", properties.getString(S3_SECRET_KEY)) + val storageConnectionInfo = getStorageConnectionInformation() + conf.set("fs.s3a.access.key", storageConnectionInfo.getUser()) + conf.set("fs.s3a.secret.key", storageConnectionInfo.getPassword()) conf } diff --git a/src/main/scala/com/exasol/cloudetl/bucket/SecureBucket.scala b/src/main/scala/com/exasol/cloudetl/bucket/SecureBucket.scala new file mode 100644 index 00000000..d47241f1 --- /dev/null +++ b/src/main/scala/com/exasol/cloudetl/bucket/SecureBucket.scala @@ -0,0 +1,79 @@ +package com.exasol.cloudetl.bucket + +import com.exasol.cloudetl.storage.StorageConnectionInformation + +import com.typesafe.scalalogging.LazyLogging + +/** + * A trait that provides methods to access [[Bucket]]s using secure + * access credentials. + */ +trait SecureBucket extends LazyLogging { self: Bucket => + + /** + * A placeholder variable for different access account methods in the + * specific bucket implementations. + * + * Having these variables abstracted makes the bucket implementations + * cleaner. + * + * For example, when accessing S3 bucket, accountName is set as {@code + * AWS_ACCESS_KEY}. + */ + val accountName: String + + /** + * A placeholder variable for different access secret methods in the + * specific bucket implementations. + * + * For example, when accessing S3 bucket, accountSecret is set as + * {@code AWS_SECRET_KEY}. + */ + val accountSecret: String + + /** + * Validates that the named connection object or access credentials + * are available. + */ + protected[this] final def validateConnectionProperties(): Unit = { + if (hasSecureProperties()) { + logger.info( + s"Using secure credentials $accountName and $accountSecret properties is deprecated. " + + "Please use an Exasol named connection object via CONNECTION_NAME property." + ) + } + val connectionExceptionMessage = + s"Please provide either only CONNECTION_NAME property or $accountName " + + s"and $accountSecret property pairs, but not the both!" + if (properties.hasNamedConnection()) { + if (hasSecureProperties()) { + throw new IllegalArgumentException(connectionExceptionMessage) + } + } else { + if (!hasSecureProperties()) { + throw new IllegalArgumentException(connectionExceptionMessage) + } + } + } + + private[this] def hasSecureProperties(): Boolean = + properties.containsKey(accountName) && properties.containsKey(accountSecret) + + /** + * Returns the [[com.exasol.ExaConnectionInformation]] Exasol named + * connection information object for this bucket. + */ + final def getStorageConnectionInformation(): StorageConnectionInformation = + if (!properties.hasNamedConnection() && hasSecureProperties()) { + StorageConnectionInformation( + properties.getString(accountName), + properties.getString(accountSecret) + ) + } else if (properties.hasNamedConnection()) { + val exaConnectionInfo = properties.getConnectionInformation() + StorageConnectionInformation(exaConnectionInfo.getUser(), exaConnectionInfo.getPassword()) + } else { + throw new IllegalArgumentException("Please provide a CONNECTION_NAME property!") + } + +} diff --git a/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportTable.scala b/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportTable.scala index ab2da7cc..e9906907 100644 --- a/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportTable.scala +++ b/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportTable.scala @@ -17,7 +17,7 @@ import com.typesafe.scalalogging.LazyLogging object ExportTable extends LazyLogging { def run(metadata: ExaMetadata, iterator: ExaIterator): Unit = { - val storageProperties = StorageProperties(iterator.getString(1)) + val storageProperties = StorageProperties(iterator.getString(1), metadata) val bucket = Bucket(storageProperties) val srcColumnNames = iterator.getString(2).split("\\.") val firstColumnIdx = 3 diff --git a/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportFiles.scala b/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportFiles.scala index 7392caae..f8196a0a 100644 --- a/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportFiles.scala +++ b/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportFiles.scala @@ -15,7 +15,7 @@ import org.apache.hadoop.fs.Path object ImportFiles extends LazyLogging { def run(metadata: ExaMetadata, iterator: ExaIterator): Unit = { - val storageProperties = StorageProperties(iterator.getString(1)) + val storageProperties = StorageProperties(iterator.getString(1), metadata) val fileFormat = storageProperties.getFileFormat() val bucket = Bucket(storageProperties) diff --git a/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportMetadata.scala b/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportMetadata.scala index 41c6f178..7ea08759 100644 --- a/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportMetadata.scala +++ b/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportMetadata.scala @@ -17,7 +17,7 @@ object ImportMetadata extends LazyLogging { + s"with parallelism: ${parallelism.toString}" ) - val storageProperties = StorageProperties(iterator.getString(1)) + val storageProperties = StorageProperties(iterator.getString(1), metadata) val bucket = Bucket(storageProperties) val paths = bucket.getPaths().filter(p => !p.getName().startsWith("_")) logger.info(s"Total number of files: ${paths.size} in bucket path: $bucketPath") diff --git a/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportPath.scala b/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportPath.scala index 470026ad..762dad3e 100644 --- a/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportPath.scala +++ b/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportPath.scala @@ -17,18 +17,18 @@ object ImportPath { val bucket = Bucket(storageProperties) bucket.validate() + val scriptSchema = metadata.getScriptSchema val bucketPath = bucket.bucketPath val parallelism = storageProperties.getParallelism("nproc()") - val storagePropertiesStr = storageProperties.mkString() - val scriptSchema = metadata.getScriptSchema + val storagePropertiesAsString = storageProperties.mkString() s"""SELECT | $scriptSchema.IMPORT_FILES( - | '$bucketPath', '$storagePropertiesStr', filename + | '$bucketPath', '$storagePropertiesAsString', filename |) |FROM ( | SELECT $scriptSchema.IMPORT_METADATA( - | '$bucketPath', '$storagePropertiesStr', $parallelism + | '$bucketPath', '$storagePropertiesAsString', $parallelism | ) |) |GROUP BY diff --git a/src/main/scala/com/exasol/cloudetl/storage/StorageConnectionInformation.scala b/src/main/scala/com/exasol/cloudetl/storage/StorageConnectionInformation.scala new file mode 100644 index 00000000..ab96fdf6 --- /dev/null +++ b/src/main/scala/com/exasol/cloudetl/storage/StorageConnectionInformation.scala @@ -0,0 +1,12 @@ +package com.exasol.cloudetl.storage + +import com.exasol.ExaConnectionInformation +import com.exasol.ExaConnectionInformation.ConnectionType + +final case class StorageConnectionInformation(user: String, password: String) + extends ExaConnectionInformation { + override def getAddress(): String = "" + override def getPassword(): String = password + override def getUser(): String = user + override def getType(): ConnectionType = ConnectionType.PASSWORD +} diff --git a/src/main/scala/com/exasol/cloudetl/storage/StorageProperties.scala b/src/main/scala/com/exasol/cloudetl/storage/StorageProperties.scala index a7a79c20..3205cc5d 100644 --- a/src/main/scala/com/exasol/cloudetl/storage/StorageProperties.scala +++ b/src/main/scala/com/exasol/cloudetl/storage/StorageProperties.scala @@ -2,6 +2,8 @@ package com.exasol.cloudetl.storage import java.net.URI +import com.exasol.ExaConnectionInformation +import com.exasol.ExaMetadata import com.exasol.cloudetl.common.AbstractProperties import com.exasol.cloudetl.common.CommonProperties @@ -11,8 +13,10 @@ import com.exasol.cloudetl.common.CommonProperties * provided key-value parameters for storage import and export * user-defined-functions (udfs). */ -class StorageProperties(private val properties: Map[String, String]) - extends AbstractProperties(properties) { +class StorageProperties( + private val properties: Map[String, String], + private val exaMetadata: Option[ExaMetadata] +) extends AbstractProperties(properties) { import StorageProperties._ @@ -46,6 +50,20 @@ class StorageProperties(private val properties: Map[String, String]) final def getParallelism(defaultValue: => String): String = get(PARALLELISM).fold(defaultValue)(identity) + /** + * Returns an Exasol [[ExaConnectionInformation]] named connection + * information. + */ + @SuppressWarnings(Array("org.wartremover.warts.AsInstanceOf")) + final def getConnectionInformation(): ExaConnectionInformation = + exaMetadata.fold { + throw new IllegalArgumentException("Exasol metadata is None!") + }(_.getConnection(getString(CONNECTION_NAME))) + + /** Checks if the Exasol named connection property is provided. */ + final def hasNamedConnection(): Boolean = + containsKey(CONNECTION_NAME) + /** * Returns a string value of key-value property pairs. * @@ -72,9 +90,26 @@ object StorageProperties extends CommonProperties { /** An optional property key name for the parallelism. */ private[storage] final val PARALLELISM: String = "PARALLELISM" - /** Returns [[StorageProperties]] from key-value pairs map. */ + /** An optional property key name for the named connection object. */ + private[storage] final val CONNECTION_NAME: String = "CONNECTION_NAME" + + /** + * Returns [[StorageProperties]] from key values map and + * [[ExaMetadata]] metadata object. + */ + def apply(params: Map[String, String], metadata: ExaMetadata): StorageProperties = + new StorageProperties(params, Option(metadata)) + + /** Returns [[StorageProperties]] from only key-value pairs map. */ def apply(params: Map[String, String]): StorageProperties = - new StorageProperties(params) + new StorageProperties(params, None) + + /** + * Returns [[StorageProperties]] from properly separated string and + * [[ExaMetadata]] metadata object. + */ + def apply(string: String, metadata: ExaMetadata): StorageProperties = + apply(mapFromString(string), metadata) /** Returns [[StorageProperties]] from properly separated string. */ def apply(string: String): StorageProperties = diff --git a/src/test/scala/com/exasol/cloudetl/bucket/AbstractBucketTest.scala b/src/test/scala/com/exasol/cloudetl/bucket/AbstractBucketTest.scala index bfffe58c..d7c05992 100644 --- a/src/test/scala/com/exasol/cloudetl/bucket/AbstractBucketTest.scala +++ b/src/test/scala/com/exasol/cloudetl/bucket/AbstractBucketTest.scala @@ -1,5 +1,7 @@ package com.exasol.cloudetl.bucket +import com.exasol.cloudetl.storage.StorageProperties + import org.scalatest.BeforeAndAfterEach import org.scalatest.FunSuite @@ -14,4 +16,7 @@ class AbstractBucketTest extends FunSuite with BeforeAndAfterEach { () } + protected[this] final def getBucket(params: Map[String, String]): Bucket = + Bucket(StorageProperties(params)) + } diff --git a/src/test/scala/com/exasol/cloudetl/bucket/AzureBlobBucketTest.scala b/src/test/scala/com/exasol/cloudetl/bucket/AzureBlobBucketTest.scala index 27f9c601..49d4fbb4 100644 --- a/src/test/scala/com/exasol/cloudetl/bucket/AzureBlobBucketTest.scala +++ b/src/test/scala/com/exasol/cloudetl/bucket/AzureBlobBucketTest.scala @@ -31,15 +31,15 @@ class AzureBlobBucketTest extends AbstractBucketTest { test("getConfiguration throws if account name is not provided") { properties = defaultProperties val thrown = intercept[IllegalArgumentException] { - assertAzureBlobBucket(Bucket(properties)) + assertAzureBlobBucket(getBucket(properties)) } assert(thrown.getMessage === "Please provide a value for the AZURE_ACCOUNT_NAME property!") } - test("getConfiguration throws if neither secret key nor sas tokan account is provided") { + test("getConfiguration throws if neither secret key nor sas token account is provided") { properties = defaultProperties ++ Map("AZURE_ACCOUNT_NAME" -> "account1") val thrown = intercept[IllegalArgumentException] { - assertAzureBlobBucket(Bucket(properties)) + assertAzureBlobBucket(getBucket(properties)) } assert( thrown.getMessage === "Please provide a value for either " + @@ -52,7 +52,7 @@ class AzureBlobBucketTest extends AbstractBucketTest { "AZURE_ACCOUNT_NAME" -> "account1", "AZURE_SECRET_KEY" -> "secret" ) - val bucket = Bucket(properties) + val bucket = getBucket(properties) assertAzureBlobBucket(bucket) assert( bucket @@ -67,7 +67,7 @@ class AzureBlobBucketTest extends AbstractBucketTest { "AZURE_SAS_TOKEN" -> "token" ) val thrown = intercept[IllegalArgumentException] { - assertAzureBlobBucket(Bucket(properties)) + assertAzureBlobBucket(getBucket(properties)) } assert(thrown.getMessage === "Please provide a value for the AZURE_CONTAINER_NAME property!") } @@ -78,7 +78,7 @@ class AzureBlobBucketTest extends AbstractBucketTest { "AZURE_SAS_TOKEN" -> "token", "AZURE_CONTAINER_NAME" -> "container1" ) - val bucket = Bucket(properties) + val bucket = getBucket(properties) assertAzureBlobBucket(bucket) assert( bucket diff --git a/src/test/scala/com/exasol/cloudetl/bucket/BucketTest.scala b/src/test/scala/com/exasol/cloudetl/bucket/BucketTest.scala index 8d9132f6..bc06bf23 100644 --- a/src/test/scala/com/exasol/cloudetl/bucket/BucketTest.scala +++ b/src/test/scala/com/exasol/cloudetl/bucket/BucketTest.scala @@ -9,14 +9,14 @@ class BucketTest extends AbstractBucketTest { test("apply throws if the scheme is not supported") { properties = Map(PATH -> "xyz:/bucket/files*", FORMAT -> "ORC") val thrown = intercept[IllegalArgumentException] { - Bucket(properties) + getBucket(properties) } assert(thrown.getMessage === "Unsupported path scheme xyz!") } test("apply returns LocalBucket") { properties = Map(PATH -> "file://local/path/bucket/", FORMAT -> "ORC") - val bucket = Bucket(properties) + val bucket = getBucket(properties) assert(bucket.isInstanceOf[LocalBucket]) } @@ -28,7 +28,7 @@ class BucketTest extends AbstractBucketTest { "S3_ACCESS_KEY" -> "abc", "S3_SECRET_KEY" -> "xyz" ) - val bucket = Bucket(properties) + val bucket = getBucket(properties) val conf = bucket.getConfiguration() assert(bucket.isInstanceOf[S3Bucket]) @@ -45,7 +45,7 @@ class BucketTest extends AbstractBucketTest { "GCS_PROJECT_ID" -> "projX", "GCS_KEYFILE_PATH" -> "/bucketfs/bucket1/projX.json" ) - val bucket = Bucket(properties) + val bucket = getBucket(properties) val conf = bucket.getConfiguration() assert(bucket.isInstanceOf[GCSBucket]) @@ -62,7 +62,7 @@ class BucketTest extends AbstractBucketTest { "AZURE_CLIENT_SECRET" -> "client_secret", "AZURE_DIRECTORY_ID" -> "directory_id_secret" ) - val bucket = Bucket(properties) + val bucket = getBucket(properties) assert(bucket.isInstanceOf[AzureAdlsBucket]) val conf = bucket.getConfiguration() diff --git a/src/test/scala/com/exasol/cloudetl/bucket/SecureBucketTest.scala b/src/test/scala/com/exasol/cloudetl/bucket/SecureBucketTest.scala new file mode 100644 index 00000000..816c6190 --- /dev/null +++ b/src/test/scala/com/exasol/cloudetl/bucket/SecureBucketTest.scala @@ -0,0 +1,83 @@ +package com.exasol.cloudetl.bucket + +import com.exasol.ExaConnectionInformation +import com.exasol.ExaMetadata +import com.exasol.cloudetl.storage.StorageConnectionInformation +import com.exasol.cloudetl.storage.StorageProperties + +import org.apache.hadoop.conf.Configuration +import org.mockito.Mockito.times +import org.mockito.Mockito.verify +import org.mockito.Mockito.when +import org.scalatest.mockito.MockitoSugar + +class SecureBucketTest extends AbstractBucketTest with MockitoSugar { + + test("validate throws if no access credentials are provided") { + val thrown = intercept[IllegalArgumentException] { + BaseSecureBucket(properties).validate() + } + assert(thrown.getMessage.contains("Please provide either only CONNECTION_NAME property or")) + } + + test("validate throws if both connection name and access properties are provided") { + properties = Map( + "CONNECTION_NAME" -> "named_connection", + "accountNameProperty" -> "user", + "accountSecretProperty" -> "secret" + ) + val thrown = intercept[IllegalArgumentException] { + BaseSecureBucket(properties).validate() + } + assert(thrown.getMessage.contains("Please provide either only CONNECTION_NAME property or")) + assert(thrown.getMessage.contains("property pairs, but not the both!")) + } + + test("getStorageConnectionInformation throws if no access properties are provided") { + val thrown = intercept[IllegalArgumentException] { + BaseSecureBucket(properties).getStorageConnectionInformation() + } + assert(thrown.getMessage === "Please provide a CONNECTION_NAME property!") + } + + test("getStorageConnectionInformation returns secure properties, no named connection") { + properties = Map("accountNameProperty" -> "account", "accountSecretProperty" -> "sekret") + val expected = StorageConnectionInformation("account", "sekret") + assert(BaseSecureBucket(properties).getStorageConnectionInformation() === expected) + } + + test("getStorageConnectionInformation returns connection info from Exasol named connection") { + properties = Map("CONNECTION_NAME" -> "connection_info") + val metadata = mock[ExaMetadata] + val connectionInfo: ExaConnectionInformation = new ExaConnectionInformation() { + override def getType(): ExaConnectionInformation.ConnectionType = + ExaConnectionInformation.ConnectionType.PASSWORD + override def getAddress(): String = "" + override def getUser(): String = "account" + override def getPassword(): String = "secRet" + } + when(metadata.getConnection("connection_info")).thenReturn(connectionInfo) + val expected = StorageConnectionInformation("account", "secRet") + assert(BaseSecureBucket(properties, metadata).getStorageConnectionInformation() === expected) + verify(metadata, times(1)).getConnection("connection_info") + } + + @SuppressWarnings(Array("org.wartremover.warts.DefaultArguments")) + private[this] case class BaseSecureBucket( + val params: Map[String, String], + val metadata: ExaMetadata = null + ) extends Bucket + with SecureBucket { + override val properties = StorageProperties(params, metadata) + + override val bucketPath = "local_path" + override def getRequiredProperties: Seq[String] = Seq.empty[String] + override def getConfiguration: Configuration = new Configuration() + override def validate(): Unit = { + validateRequiredProperties() + validateConnectionProperties() + } + override val accountName = "accountNameProperty" + override val accountSecret = "accountSecretProperty" + } +} diff --git a/src/test/scala/com/exasol/cloudetl/scriptclasses/ExportPathTest.scala b/src/test/scala/com/exasol/cloudetl/scriptclasses/ExportPathTest.scala index 2307d95d..fa417e66 100644 --- a/src/test/scala/com/exasol/cloudetl/scriptclasses/ExportPathTest.scala +++ b/src/test/scala/com/exasol/cloudetl/scriptclasses/ExportPathTest.scala @@ -36,14 +36,14 @@ class ExportPathTest extends PathTest { } test("generateSqlForExportSpec throws if required property is not set") { - val newProperties = properties - ("S3_ACCESS_KEY") + val newProperties = properties - ("S3_ENDPOINT") when(metadata.getScriptSchema()).thenReturn(schema) when(exportSpec.getParameters()).thenReturn(newProperties.asJava) val thrown = intercept[IllegalArgumentException] { ExportPath.generateSqlForExportSpec(metadata, exportSpec) } - assert(thrown.getMessage === "Please provide a value for the S3_ACCESS_KEY property!") + assert(thrown.getMessage === "Please provide a value for the S3_ENDPOINT property!") verify(exportSpec, times(1)).getParameters verify(exportSpec, never).getSourceColumnNames } diff --git a/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportPathTest.scala b/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportPathTest.scala index da29db6a..dc142b01 100644 --- a/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportPathTest.scala +++ b/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportPathTest.scala @@ -36,14 +36,14 @@ class ImportPathTest extends PathTest { } test("generateSqlForImportSpec throws if required property is not set") { - val newProperties = properties - ("S3_ACCESS_KEY") + val newProperties = properties - ("S3_ENDPOINT") when(metadata.getScriptSchema()).thenReturn(schema) when(importSpec.getParameters()).thenReturn(newProperties.asJava) val thrown = intercept[IllegalArgumentException] { ImportPath.generateSqlForImportSpec(metadata, importSpec) } - assert(thrown.getMessage === "Please provide a value for the S3_ACCESS_KEY property!") + assert(thrown.getMessage === "Please provide a value for the S3_ENDPOINT property!") verify(importSpec, times(1)).getParameters } diff --git a/src/test/scala/com/exasol/cloudetl/sink/BatchSizedSinkTest.scala b/src/test/scala/com/exasol/cloudetl/sink/BatchSizedSinkTest.scala index e439209b..423cc2ae 100644 --- a/src/test/scala/com/exasol/cloudetl/sink/BatchSizedSinkTest.scala +++ b/src/test/scala/com/exasol/cloudetl/sink/BatchSizedSinkTest.scala @@ -42,7 +42,7 @@ class BatchSizedSinkTest extends FunSuite with BeforeAndAfterEach with DummyReco test("export single file with default batch size") { val bucket = LocalBucket( outputPath.toUri.toString, - new StorageProperties(properties ++ Map("EXPORT_BATCH_SIZE" -> "4")) + StorageProperties(properties ++ Map("EXPORT_BATCH_SIZE" -> "4")) ) val sink = new BatchSizedSink(1L, "vm1", 2, columnMetadata, bucket) rows.foreach { row => @@ -55,7 +55,7 @@ class BatchSizedSinkTest extends FunSuite with BeforeAndAfterEach with DummyReco test("export several files with batch size smaller than total records") { val bucket = LocalBucket( outputPath.toUri.toString, - new StorageProperties(properties ++ Map("EXPORT_BATCH_SIZE" -> "3")) + StorageProperties(properties ++ Map("EXPORT_BATCH_SIZE" -> "3")) ) val sink = new BatchSizedSink(1L, "vm1", 7, columnMetadata, bucket) val newRows = rows ++ rows ++ rows ++ rows.take(1) diff --git a/src/test/scala/com/exasol/cloudetl/storage/StorageConnectionInformationTest.scala b/src/test/scala/com/exasol/cloudetl/storage/StorageConnectionInformationTest.scala new file mode 100644 index 00000000..c5e5a46c --- /dev/null +++ b/src/test/scala/com/exasol/cloudetl/storage/StorageConnectionInformationTest.scala @@ -0,0 +1,17 @@ +package com.exasol.cloudetl.storage + +import com.exasol.ExaConnectionInformation + +import org.scalatest.FunSuite + +class StorageConnectionInformationTest extends FunSuite { + + test("apply returns correct user and password") { + val info = StorageConnectionInformation("user", "p@sword") + assert(info.getUser() === "user") + assert(info.getPassword() === "p@sword") + assert(info.getAddress() === "") + assert(info.getType() === ExaConnectionInformation.ConnectionType.PASSWORD) + } + +} diff --git a/src/test/scala/com/exasol/cloudetl/storage/StoragePropertiesTest.scala b/src/test/scala/com/exasol/cloudetl/storage/StoragePropertiesTest.scala index c455adcc..46d22eed 100644 --- a/src/test/scala/com/exasol/cloudetl/storage/StoragePropertiesTest.scala +++ b/src/test/scala/com/exasol/cloudetl/storage/StoragePropertiesTest.scala @@ -1,9 +1,16 @@ package com.exasol.cloudetl.storage +import com.exasol.ExaConnectionInformation +import com.exasol.ExaMetadata + +import org.mockito.Mockito.times +import org.mockito.Mockito.verify +import org.mockito.Mockito.when import org.scalatest.BeforeAndAfterEach import org.scalatest.FunSuite +import org.scalatest.mockito.MockitoSugar -class StoragePropertiesTest extends FunSuite with BeforeAndAfterEach { +class StoragePropertiesTest extends FunSuite with BeforeAndAfterEach with MockitoSugar { private[this] var properties: Map[String, String] = _ @@ -66,6 +73,38 @@ class StoragePropertiesTest extends FunSuite with BeforeAndAfterEach { assert(BaseProperties(properties).getParallelism("nproc()") === "nproc()") } + test("getConnectionInformation throws if Exasol metadata is not provided") { + val thrown = intercept[IllegalArgumentException] { + BaseProperties(properties).getConnectionInformation() + } + assert(thrown.getMessage === "Exasol metadata is None!") + } + + test("getConnectionInformation returns storage connection information") { + properties = Map(StorageProperties.CONNECTION_NAME -> "connection_info") + val metadata = mock[ExaMetadata] + val connectionInfo: ExaConnectionInformation = new ExaConnectionInformation() { + override def getType(): ExaConnectionInformation.ConnectionType = + ExaConnectionInformation.ConnectionType.PASSWORD + override def getAddress(): String = "" + override def getUser(): String = "user" + override def getPassword(): String = "secret" + } + + when(metadata.getConnection("connection_info")).thenReturn(connectionInfo) + assert(StorageProperties(properties, metadata).getConnectionInformation() === connectionInfo) + verify(metadata, times(1)).getConnection("connection_info") + } + + test("hasNamedConnection returns false by default") { + assert(BaseProperties(properties).hasNamedConnection() === false) + } + + test("hasNamedConnection returns true if connection name is set") { + properties = Map(StorageProperties.CONNECTION_NAME -> "named_connection") + assert(BaseProperties(properties).hasNamedConnection() === true) + } + test("mkString returns empty string by default") { val str = BaseProperties(properties).mkString() assert(str.isEmpty === true) @@ -84,6 +123,13 @@ class StoragePropertiesTest extends FunSuite with BeforeAndAfterEach { assert(StorageProperties(properties) === baseProperty) } + test("apply(map) with Exasol metadata returns correct StorageProperties") { + properties = Map("k" -> "v") + val metadata = mock[ExaMetadata] + val expected = StorageProperties(properties, metadata) + assert(StorageProperties(properties, metadata) === expected) + } + test("apply(string) throws if input string does not contain separator") { val thrown = intercept[IllegalArgumentException] { StorageProperties("") @@ -98,7 +144,15 @@ class StoragePropertiesTest extends FunSuite with BeforeAndAfterEach { assert(StorageProperties(mkStringResult) === baseProperty) } + test("apply(string) with Exasol metadata returns correct StorageProperties") { + properties = Map("k1" -> "v1", "k2" -> "v2") + val metadata = mock[ExaMetadata] + val expected = StorageProperties(properties, metadata) + val mkStringResult = BaseProperties(properties).mkString() + assert(StorageProperties(mkStringResult, metadata) === expected) + } + private[this] case class BaseProperties(val params: Map[String, String]) - extends StorageProperties(params) + extends StorageProperties(params, None) } From f5fbc840d3305142d8a2ef21eb9278aa6bdd9894 Mon Sep 17 00:00:00 2001 From: Muhammet Orazov Date: Sun, 1 Dec 2019 23:56:52 +0100 Subject: [PATCH 2/4] Redesign usage of Exasol named connection object. Secure credentials are provided using the connection object password identification property. For example, users can create a connection object as below, where secure credentials are separated by semicolon (';'), and provided as 'KEY=VALUE' pairs. ``` CREATE CONNECTION S3_IMPORT_CONNECTION TO '' USER '' IDENTIFIED BY 'S3_SECRET_KEY=;S3_SESSION_TOKEN='; ``` Or similarly setting access key in IDENTIFIED BY section: ``` CREATE CONNECTION S3_IMPORT_CONNECTION TO '' USER '' IDENTIFIED BY 'S3_ACCESS_KEY=;S3_SECRET_KEY=;S3_SESSION_TOKEN='; ``` Then we parse this string internally and update the properties structure with new key - value pairs. This is better design since secure credentials are usually more than single username / password pairs; moreover, this implementation makes it easier to extend previous non-connection implementation. This commit also adds support for S3 session token, that is useful when accessing S3 using Multi Factor Authentication (MFA). --- .../com/exasol/cloudetl/bucket/S3Bucket.scala | 31 +++-- .../exasol/cloudetl/bucket/SecureBucket.scala | 48 ++------ .../StorageConnectionInformation.scala | 12 -- .../cloudetl/storage/StorageProperties.scala | 29 +++++ .../cloudetl/bucket/AbstractBucketTest.scala | 7 ++ .../exasol/cloudetl/bucket/BucketTest.scala | 19 --- .../exasol/cloudetl/bucket/S3BucketTest.scala | 111 ++++++++++++++++++ .../cloudetl/bucket/SecureBucketTest.scala | 40 +------ .../StorageConnectionInformationTest.scala | 17 --- 9 files changed, 179 insertions(+), 135 deletions(-) delete mode 100644 src/main/scala/com/exasol/cloudetl/storage/StorageConnectionInformation.scala create mode 100644 src/test/scala/com/exasol/cloudetl/bucket/S3BucketTest.scala delete mode 100644 src/test/scala/com/exasol/cloudetl/storage/StorageConnectionInformationTest.scala diff --git a/src/main/scala/com/exasol/cloudetl/bucket/S3Bucket.scala b/src/main/scala/com/exasol/cloudetl/bucket/S3Bucket.scala index 2bba39c0..114173ae 100644 --- a/src/main/scala/com/exasol/cloudetl/bucket/S3Bucket.scala +++ b/src/main/scala/com/exasol/cloudetl/bucket/S3Bucket.scala @@ -12,6 +12,7 @@ final case class S3Bucket(path: String, params: StorageProperties) private[this] val S3_ENDPOINT: String = "S3_ENDPOINT" private[this] val S3_ACCESS_KEY: String = "S3_ACCESS_KEY" private[this] val S3_SECRET_KEY: String = "S3_SECRET_KEY" + private[this] val S3_SESSION_TOKEN: String = "S3_SESSION_TOKEN" /** @inheritdoc */ override val bucketPath: String = path @@ -19,15 +20,13 @@ final case class S3Bucket(path: String, params: StorageProperties) /** @inheritdoc */ override val properties: StorageProperties = params - /** @inheritdoc */ - override val accountName: String = S3_ACCESS_KEY - - /** @inheritdoc */ - override val accountSecret: String = S3_SECRET_KEY - /** Returns the list of required property keys for AWS S3 Storage. */ override def getRequiredProperties(): Seq[String] = Seq(S3_ENDPOINT) + /** @inheritdoc */ + override def getSecureProperties(): Seq[String] = Seq(S3_SECRET_KEY, S3_SESSION_TOKEN) + + /** @inheritdoc */ override def validate(): Unit = { validateRequiredProperties() validateConnectionProperties() @@ -46,9 +45,23 @@ final case class S3Bucket(path: String, params: StorageProperties) conf.set("fs.file.impl", classOf[org.apache.hadoop.fs.LocalFileSystem].getName) conf.set("fs.s3a.impl", classOf[org.apache.hadoop.fs.s3a.S3AFileSystem].getName) conf.set("fs.s3a.endpoint", properties.getString(S3_ENDPOINT)) - val storageConnectionInfo = getStorageConnectionInformation() - conf.set("fs.s3a.access.key", storageConnectionInfo.getUser()) - conf.set("fs.s3a.secret.key", storageConnectionInfo.getPassword()) + + val mergedProperties = if (properties.hasNamedConnection()) { + properties.merge(S3_ACCESS_KEY) + } else { + properties + } + + conf.set("fs.s3a.access.key", mergedProperties.getString(S3_ACCESS_KEY)) + conf.set("fs.s3a.secret.key", mergedProperties.getString(S3_SECRET_KEY)) + + if (mergedProperties.containsKey(S3_SESSION_TOKEN)) { + conf.set( + "fs.s3a.aws.credentials.provider", + classOf[org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider].getName + ) + conf.set("fs.s3a.session.token", mergedProperties.getString(S3_SESSION_TOKEN)) + } conf } diff --git a/src/main/scala/com/exasol/cloudetl/bucket/SecureBucket.scala b/src/main/scala/com/exasol/cloudetl/bucket/SecureBucket.scala index d47241f1..3424e3b1 100644 --- a/src/main/scala/com/exasol/cloudetl/bucket/SecureBucket.scala +++ b/src/main/scala/com/exasol/cloudetl/bucket/SecureBucket.scala @@ -1,7 +1,5 @@ package com.exasol.cloudetl.bucket -import com.exasol.cloudetl.storage.StorageConnectionInformation - import com.typesafe.scalalogging.LazyLogging /** @@ -11,25 +9,12 @@ import com.typesafe.scalalogging.LazyLogging trait SecureBucket extends LazyLogging { self: Bucket => /** - * A placeholder variable for different access account methods in the - * specific bucket implementations. - * - * Having these variables abstracted makes the bucket implementations - * cleaner. - * - * For example, when accessing S3 bucket, accountName is set as {@code - * AWS_ACCESS_KEY}. - */ - val accountName: String - - /** - * A placeholder variable for different access secret methods in the - * specific bucket implementations. + * Return the list of property key names that are used as secure + * access credentials. * - * For example, when accessing S3 bucket, accountSecret is set as - * {@code AWS_SECRET_KEY}. + * For example, {@code AWS_SECRET_KEY} when accessing an S3 bucket. */ - val accountSecret: String + def getSecureProperties(): Seq[String] /** * Validates that the named connection object or access credentials @@ -38,13 +23,13 @@ trait SecureBucket extends LazyLogging { self: Bucket => protected[this] final def validateConnectionProperties(): Unit = { if (hasSecureProperties()) { logger.info( - s"Using secure credentials $accountName and $accountSecret properties is deprecated. " + + "Using secure credential parameters is deprecated. " + "Please use an Exasol named connection object via CONNECTION_NAME property." ) } val connectionExceptionMessage = - s"Please provide either only CONNECTION_NAME property or $accountName " + - s"and $accountSecret property pairs, but not the both!" + "Please provide either only CONNECTION_NAME property or secure access " + + "credential property pairs, but not the both!" if (properties.hasNamedConnection()) { if (hasSecureProperties()) { throw new IllegalArgumentException(connectionExceptionMessage) @@ -57,23 +42,6 @@ trait SecureBucket extends LazyLogging { self: Bucket => } private[this] def hasSecureProperties(): Boolean = - properties.containsKey(accountName) && properties.containsKey(accountSecret) - - /** - * Returns the [[com.exasol.ExaConnectionInformation]] Exasol named - * connection information object for this bucket. - */ - final def getStorageConnectionInformation(): StorageConnectionInformation = - if (!properties.hasNamedConnection() && hasSecureProperties()) { - StorageConnectionInformation( - properties.getString(accountName), - properties.getString(accountSecret) - ) - } else if (properties.hasNamedConnection()) { - val exaConnectionInfo = properties.getConnectionInformation() - StorageConnectionInformation(exaConnectionInfo.getUser(), exaConnectionInfo.getPassword()) - } else { - throw new IllegalArgumentException("Please provide a CONNECTION_NAME property!") - } + getSecureProperties.exists(properties.containsKey(_)) } diff --git a/src/main/scala/com/exasol/cloudetl/storage/StorageConnectionInformation.scala b/src/main/scala/com/exasol/cloudetl/storage/StorageConnectionInformation.scala deleted file mode 100644 index ab96fdf6..00000000 --- a/src/main/scala/com/exasol/cloudetl/storage/StorageConnectionInformation.scala +++ /dev/null @@ -1,12 +0,0 @@ -package com.exasol.cloudetl.storage - -import com.exasol.ExaConnectionInformation -import com.exasol.ExaConnectionInformation.ConnectionType - -final case class StorageConnectionInformation(user: String, password: String) - extends ExaConnectionInformation { - override def getAddress(): String = "" - override def getPassword(): String = password - override def getUser(): String = user - override def getType(): ConnectionType = ConnectionType.PASSWORD -} diff --git a/src/main/scala/com/exasol/cloudetl/storage/StorageProperties.scala b/src/main/scala/com/exasol/cloudetl/storage/StorageProperties.scala index 3205cc5d..d7cc36f9 100644 --- a/src/main/scala/com/exasol/cloudetl/storage/StorageProperties.scala +++ b/src/main/scala/com/exasol/cloudetl/storage/StorageProperties.scala @@ -64,6 +64,35 @@ class StorageProperties( final def hasNamedConnection(): Boolean = containsKey(CONNECTION_NAME) + /** + * Returns a new [[StorageProperties]] what merges the key-value pairs + * parsed from user provided Exasol named connection object. + */ + final def merge(accountName: String): StorageProperties = { + val connectionParsedMap = parseConnectionInfo(accountName) + val newProperties = properties ++ connectionParsedMap + new StorageProperties(newProperties, exaMetadata) + } + + /** + * Parses the connection object password into key-value map pairs. + * + * If the connection object contains the username, it is mapped to the + * {@code keyForUsername} parameter. However, this value is + * overwritted if the provided key is available in password string of + * connection object. + */ + private[this] def parseConnectionInfo(keyForUsername: String): Map[String, String] = { + val connection = getConnectionInformation() + val username = connection.getUser() + val password = connection.getPassword(); + val map = password.split(";").map { str => + val idx = str.indexOf('=') + str.substring(0, idx) -> str.substring(idx + 1) + } + Map(keyForUsername -> username) ++ map + } + /** * Returns a string value of key-value property pairs. * diff --git a/src/test/scala/com/exasol/cloudetl/bucket/AbstractBucketTest.scala b/src/test/scala/com/exasol/cloudetl/bucket/AbstractBucketTest.scala index d7c05992..d4a70652 100644 --- a/src/test/scala/com/exasol/cloudetl/bucket/AbstractBucketTest.scala +++ b/src/test/scala/com/exasol/cloudetl/bucket/AbstractBucketTest.scala @@ -1,10 +1,12 @@ package com.exasol.cloudetl.bucket +import com.exasol.ExaMetadata import com.exasol.cloudetl.storage.StorageProperties import org.scalatest.BeforeAndAfterEach import org.scalatest.FunSuite +@SuppressWarnings(Array("org.wartremover.warts.Overloading")) class AbstractBucketTest extends FunSuite with BeforeAndAfterEach { private[bucket] val PATH: String = "BUCKET_PATH" @@ -19,4 +21,9 @@ class AbstractBucketTest extends FunSuite with BeforeAndAfterEach { protected[this] final def getBucket(params: Map[String, String]): Bucket = Bucket(StorageProperties(params)) + protected[this] final def getBucket( + params: Map[String, String], + exaMetadata: ExaMetadata + ): Bucket = + Bucket(StorageProperties(params, exaMetadata)) } diff --git a/src/test/scala/com/exasol/cloudetl/bucket/BucketTest.scala b/src/test/scala/com/exasol/cloudetl/bucket/BucketTest.scala index bc06bf23..63cc2a26 100644 --- a/src/test/scala/com/exasol/cloudetl/bucket/BucketTest.scala +++ b/src/test/scala/com/exasol/cloudetl/bucket/BucketTest.scala @@ -1,7 +1,6 @@ package com.exasol.cloudetl.bucket import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem -import org.apache.hadoop.fs.s3a.S3AFileSystem @SuppressWarnings(Array("org.wartremover.warts.IsInstanceOf")) class BucketTest extends AbstractBucketTest { @@ -20,24 +19,6 @@ class BucketTest extends AbstractBucketTest { assert(bucket.isInstanceOf[LocalBucket]) } - test("apply returns S3Bucket") { - properties = Map( - PATH -> "s3a://my-bucket/", - FORMAT -> "ORC", - "S3_ENDPOINT" -> "eu-central-1", - "S3_ACCESS_KEY" -> "abc", - "S3_SECRET_KEY" -> "xyz" - ) - val bucket = getBucket(properties) - val conf = bucket.getConfiguration() - - assert(bucket.isInstanceOf[S3Bucket]) - assert(conf.get("fs.s3a.impl") === classOf[S3AFileSystem].getName) - assert(conf.get("fs.s3a.endpoint") === "eu-central-1") - assert(conf.get("fs.s3a.access.key") === "abc") - assert(conf.get("fs.s3a.secret.key") === "xyz") - } - test("apply returns GCSBucket") { properties = Map( PATH -> "gs://my-bucket/", diff --git a/src/test/scala/com/exasol/cloudetl/bucket/S3BucketTest.scala b/src/test/scala/com/exasol/cloudetl/bucket/S3BucketTest.scala new file mode 100644 index 00000000..35c4362b --- /dev/null +++ b/src/test/scala/com/exasol/cloudetl/bucket/S3BucketTest.scala @@ -0,0 +1,111 @@ +package com.exasol.cloudetl.bucket + +import com.exasol.ExaConnectionInformation +import com.exasol.ExaMetadata + +import org.apache.hadoop.fs.s3a.S3AFileSystem +import org.mockito.Mockito.when +import org.scalatest.mockito.MockitoSugar + +@SuppressWarnings(Array("org.wartremover.warts.IsInstanceOf")) +class S3BucketTest extends AbstractBucketTest with MockitoSugar { + + private[this] val defaultProperties = Map( + PATH -> "s3a://my-bucket/", + FORMAT -> "AVRO", + "S3_ENDPOINT" -> "eu-central-1" + ) + + private[this] val accessKey = "access" + private[this] val secretKey = "secret" + private[this] val sessionToken = "token" + + private[this] val accessProperties = defaultProperties ++ Map( + "S3_ACCESS_KEY" -> accessKey, + "S3_SECRET_KEY" -> secretKey, + "S3_SESSION_TOKEN" -> sessionToken + ) + + private[this] val configMappings = Map( + "fs.s3a.access.key" -> accessKey, + "fs.s3a.secret.key" -> secretKey, + "fs.s3a.session.token" -> sessionToken + ) + + private[this] def assertS3Bucket(bucket: Bucket, extraMappings: Map[String, String]): Unit = { + assert(bucket.isInstanceOf[S3Bucket]) + val conf = bucket.getConfiguration() + val defaultMappings = Map( + "fs.s3a.impl" -> classOf[S3AFileSystem].getName, + "fs.s3a.endpoint" -> "eu-central-1" + ) + (defaultMappings ++ extraMappings).foreach { + case (given, expected) => + assert(conf.get(given) === expected) + } + } + + test("apply returns S3Bucket with access and secret parameters") { + properties = accessProperties - "S3_SESSION_TOKEN" + val bucket = getBucket(properties) + assertS3Bucket(bucket, configMappings - "fs.s3a.session.token") + } + + test("apply returns S3Bucket with access, secret and session token parameters") { + properties = accessProperties + val bucket = getBucket(properties) + assertS3Bucket(bucket, configMappings) + } + + test("apply returns S3Bucket with secret from connection") { + properties = defaultProperties ++ Map( + "CONNECTION_NAME" -> "connection_info" + ) + val exaMetadata = mockConnectionInfo("access", "S3_SECRET_KEY=secret") + val bucket = getBucket(properties, exaMetadata) + assertS3Bucket(bucket, configMappings - "fs.s3a.session.token") + } + + test("apply returns S3Bucket with secret and session token from connection") { + properties = defaultProperties ++ Map( + "CONNECTION_NAME" -> "connection_info" + ) + val exaMetadata = mockConnectionInfo("access", "S3_SECRET_KEY=secret;S3_SESSION_TOKEN=token") + val bucket = getBucket(properties, exaMetadata) + assertS3Bucket(bucket, configMappings) + } + + // Access key is encoded in password value of connection object. + test("apply returns S3Bucket with access and secret from connection") { + properties = defaultProperties ++ Map( + "CONNECTION_NAME" -> "connection_info" + ) + val exaMetadata = mockConnectionInfo("", "S3_ACCESS_KEY=access;S3_SECRET_KEY=secret") + val bucket = getBucket(properties, exaMetadata) + assertS3Bucket(bucket, configMappings - "fs.s3a.session.token") + } + + test("apply returns S3Bucket with access, secret and session token from connection") { + properties = defaultProperties ++ Map( + "CONNECTION_NAME" -> "connection_info" + ) + val exaMetadata = + mockConnectionInfo("", "S3_ACCESS_KEY=access;S3_SECRET_KEY=secret;S3_SESSION_TOKEN=token") + val bucket = getBucket(properties, exaMetadata) + assertS3Bucket(bucket, configMappings) + } + + private[this] final def mockConnectionInfo(username: String, password: String): ExaMetadata = { + val metadata = mock[ExaMetadata] + val connectionInfo: ExaConnectionInformation = new ExaConnectionInformation() { + override def getType(): ExaConnectionInformation.ConnectionType = + ExaConnectionInformation.ConnectionType.PASSWORD + override def getAddress(): String = "" + override def getUser(): String = username + override def getPassword(): String = password + } + when(metadata.getConnection("connection_info")).thenReturn(connectionInfo) + metadata + } + +} diff --git a/src/test/scala/com/exasol/cloudetl/bucket/SecureBucketTest.scala b/src/test/scala/com/exasol/cloudetl/bucket/SecureBucketTest.scala index 816c6190..9e9018ff 100644 --- a/src/test/scala/com/exasol/cloudetl/bucket/SecureBucketTest.scala +++ b/src/test/scala/com/exasol/cloudetl/bucket/SecureBucketTest.scala @@ -1,17 +1,11 @@ package com.exasol.cloudetl.bucket -import com.exasol.ExaConnectionInformation import com.exasol.ExaMetadata -import com.exasol.cloudetl.storage.StorageConnectionInformation import com.exasol.cloudetl.storage.StorageProperties import org.apache.hadoop.conf.Configuration -import org.mockito.Mockito.times -import org.mockito.Mockito.verify -import org.mockito.Mockito.when -import org.scalatest.mockito.MockitoSugar -class SecureBucketTest extends AbstractBucketTest with MockitoSugar { +class SecureBucketTest extends AbstractBucketTest { test("validate throws if no access credentials are provided") { val thrown = intercept[IllegalArgumentException] { @@ -33,35 +27,6 @@ class SecureBucketTest extends AbstractBucketTest with MockitoSugar { assert(thrown.getMessage.contains("property pairs, but not the both!")) } - test("getStorageConnectionInformation throws if no access properties are provided") { - val thrown = intercept[IllegalArgumentException] { - BaseSecureBucket(properties).getStorageConnectionInformation() - } - assert(thrown.getMessage === "Please provide a CONNECTION_NAME property!") - } - - test("getStorageConnectionInformation returns secure properties, no named connection") { - properties = Map("accountNameProperty" -> "account", "accountSecretProperty" -> "sekret") - val expected = StorageConnectionInformation("account", "sekret") - assert(BaseSecureBucket(properties).getStorageConnectionInformation() === expected) - } - - test("getStorageConnectionInformation returns connection info from Exasol named connection") { - properties = Map("CONNECTION_NAME" -> "connection_info") - val metadata = mock[ExaMetadata] - val connectionInfo: ExaConnectionInformation = new ExaConnectionInformation() { - override def getType(): ExaConnectionInformation.ConnectionType = - ExaConnectionInformation.ConnectionType.PASSWORD - override def getAddress(): String = "" - override def getUser(): String = "account" - override def getPassword(): String = "secRet" - } - when(metadata.getConnection("connection_info")).thenReturn(connectionInfo) - val expected = StorageConnectionInformation("account", "secRet") - assert(BaseSecureBucket(properties, metadata).getStorageConnectionInformation() === expected) - verify(metadata, times(1)).getConnection("connection_info") - } - @SuppressWarnings(Array("org.wartremover.warts.DefaultArguments")) private[this] case class BaseSecureBucket( val params: Map[String, String], @@ -72,12 +37,11 @@ class SecureBucketTest extends AbstractBucketTest with MockitoSugar { override val bucketPath = "local_path" override def getRequiredProperties: Seq[String] = Seq.empty[String] + override def getSecureProperties: Seq[String] = Seq("accountSecretProperty") override def getConfiguration: Configuration = new Configuration() override def validate(): Unit = { validateRequiredProperties() validateConnectionProperties() } - override val accountName = "accountNameProperty" - override val accountSecret = "accountSecretProperty" } } diff --git a/src/test/scala/com/exasol/cloudetl/storage/StorageConnectionInformationTest.scala b/src/test/scala/com/exasol/cloudetl/storage/StorageConnectionInformationTest.scala deleted file mode 100644 index c5e5a46c..00000000 --- a/src/test/scala/com/exasol/cloudetl/storage/StorageConnectionInformationTest.scala +++ /dev/null @@ -1,17 +0,0 @@ -package com.exasol.cloudetl.storage - -import com.exasol.ExaConnectionInformation - -import org.scalatest.FunSuite - -class StorageConnectionInformationTest extends FunSuite { - - test("apply returns correct user and password") { - val info = StorageConnectionInformation("user", "p@sword") - assert(info.getUser() === "user") - assert(info.getPassword() === "p@sword") - assert(info.getAddress() === "") - assert(info.getType() === ExaConnectionInformation.ConnectionType.PASSWORD) - } - -} From 92c7dc41f621fe594c280e021727c501889d8c4e Mon Sep 17 00:00:00 2001 From: Muhammet Orazov Date: Wed, 4 Dec 2019 10:40:12 +0100 Subject: [PATCH 3/4] Add connection object for Azure Blob and Azure ADLS storages. --- .../cloudetl/bucket/AzureAdlsBucket.scala | 29 ++++-- .../cloudetl/bucket/AzureBlobBucket.scala | 48 +++++----- .../exasol/cloudetl/bucket/SecureBucket.scala | 4 +- .../cloudetl/storage/StorageProperties.scala | 16 +++- .../cloudetl/bucket/AbstractBucketTest.scala | 21 ++++- .../cloudetl/bucket/AzureAdlsBucketTest.scala | 77 +++++++++++++++ .../cloudetl/bucket/AzureBlobBucketTest.scala | 93 +++++++++++++------ .../exasol/cloudetl/bucket/BucketTest.scala | 27 ------ .../exasol/cloudetl/bucket/S3BucketTest.scala | 30 +++--- .../cloudetl/bucket/SecureBucketTest.scala | 6 +- 10 files changed, 235 insertions(+), 116 deletions(-) create mode 100644 src/test/scala/com/exasol/cloudetl/bucket/AzureAdlsBucketTest.scala diff --git a/src/main/scala/com/exasol/cloudetl/bucket/AzureAdlsBucket.scala b/src/main/scala/com/exasol/cloudetl/bucket/AzureAdlsBucket.scala index 38171e54..5b0076af 100644 --- a/src/main/scala/com/exasol/cloudetl/bucket/AzureAdlsBucket.scala +++ b/src/main/scala/com/exasol/cloudetl/bucket/AzureAdlsBucket.scala @@ -5,7 +5,9 @@ import com.exasol.cloudetl.storage.StorageProperties import org.apache.hadoop.conf.Configuration /** A [[Bucket]] implementation for the Azure Data Lake Storage */ -final case class AzureAdlsBucket(path: String, params: StorageProperties) extends Bucket { +final case class AzureAdlsBucket(path: String, params: StorageProperties) + extends Bucket + with SecureBucket { private[this] val AZURE_CLIENT_ID: String = "AZURE_CLIENT_ID" private[this] val AZURE_CLIENT_SECRET: String = "AZURE_CLIENT_SECRET" @@ -17,17 +19,23 @@ final case class AzureAdlsBucket(path: String, params: StorageProperties) extend /** @inheritdoc */ override val properties: StorageProperties = params - /** @inheritdoc */ - override def validate(): Unit = - validateRequiredProperties() - /** * Returns the list of required property keys for Azure Data Lake * Storage. */ override def getRequiredProperties(): Seq[String] = + Seq.empty[String] + + /** @inheritdoc */ + override def getSecureProperties(): Seq[String] = Seq(AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, AZURE_DIRECTORY_ID) + /** @inheritdoc */ + override def validate(): Unit = { + validateRequiredProperties() + validateConnectionProperties() + } + /** * @inheritdoc * @@ -38,9 +46,14 @@ final case class AzureAdlsBucket(path: String, params: StorageProperties) extend validate() val conf = new Configuration() - val clientId = properties.getString(AZURE_CLIENT_ID) - val clientSecret = properties.getString(AZURE_CLIENT_SECRET) - val directoryId = properties.getString(AZURE_DIRECTORY_ID) + val mergedProperties = if (properties.hasNamedConnection()) { + properties.merge(AZURE_CLIENT_ID) + } else { + properties + } + val clientId = mergedProperties.getString(AZURE_CLIENT_ID) + val clientSecret = mergedProperties.getString(AZURE_CLIENT_SECRET) + val directoryId = mergedProperties.getString(AZURE_DIRECTORY_ID) val tokenEndpoint = s"https://login.microsoftonline.com/$directoryId/oauth2/token" conf.set("fs.adl.impl", classOf[org.apache.hadoop.fs.adl.AdlFileSystem].getName) conf.set("fs.AbstractFileSystem.adl.impl", classOf[org.apache.hadoop.fs.adl.Adl].getName) diff --git a/src/main/scala/com/exasol/cloudetl/bucket/AzureBlobBucket.scala b/src/main/scala/com/exasol/cloudetl/bucket/AzureBlobBucket.scala index 3d8518ef..9a99401c 100644 --- a/src/main/scala/com/exasol/cloudetl/bucket/AzureBlobBucket.scala +++ b/src/main/scala/com/exasol/cloudetl/bucket/AzureBlobBucket.scala @@ -5,7 +5,9 @@ import com.exasol.cloudetl.storage.StorageProperties import org.apache.hadoop.conf.Configuration /** A [[Bucket]] implementation for the Azure Blob Storage */ -final case class AzureBlobBucket(path: String, params: StorageProperties) extends Bucket { +final case class AzureBlobBucket(path: String, params: StorageProperties) + extends Bucket + with SecureBucket { private[this] val AZURE_ACCOUNT_NAME: String = "AZURE_ACCOUNT_NAME" private[this] val AZURE_CONTAINER_NAME: String = "AZURE_CONTAINER_NAME" @@ -18,30 +20,20 @@ final case class AzureBlobBucket(path: String, params: StorageProperties) extend /** @inheritdoc */ override val properties: StorageProperties = params - /** @inheritdoc */ - override def validate(): Unit = - validateRequiredProperties() - /** * Returns the list of required property keys for Azure Blob Storage. */ override def getRequiredProperties(): Seq[String] = Seq(AZURE_ACCOUNT_NAME) - /** - * Validation method specific to the Azure Blob Storage. - * - * Validates required properties by calling parent {@code validate} - * method. Furthermore, checks whether either [[AZURE_SECRET_KEY]] or - * [[AZURE_SAS_TOKEN]] parameters are available. - */ - private[this] def validateExtra(): Unit = { - validate() - if (!properties.containsKey(AZURE_SECRET_KEY) && !properties.containsKey(AZURE_SAS_TOKEN)) { - throw new IllegalArgumentException( - s"Please provide a value for either $AZURE_SECRET_KEY or $AZURE_SAS_TOKEN!" - ) - } + /** @inheritdoc */ + override def getSecureProperties(): Seq[String] = + Seq(AZURE_SECRET_KEY, AZURE_SAS_TOKEN) + + /** @inheritdoc */ + override def validate(): Unit = { + validateRequiredProperties() + validateConnectionProperties() } /** @@ -51,7 +43,7 @@ final case class AzureBlobBucket(path: String, params: StorageProperties) extend * in order to create a configuration. */ override def getConfiguration(): Configuration = { - validateExtra() + validate() val conf = new Configuration() conf.set("fs.azure", classOf[org.apache.hadoop.fs.azure.NativeAzureFileSystem].getName) @@ -63,13 +55,19 @@ final case class AzureBlobBucket(path: String, params: StorageProperties) extend classOf[org.apache.hadoop.fs.azure.Wasbs].getName ) - val accountName = properties.getString(AZURE_ACCOUNT_NAME) - if (properties.containsKey(AZURE_SAS_TOKEN)) { - val sasToken = properties.getString(AZURE_SAS_TOKEN) - val containerName = properties.getString(AZURE_CONTAINER_NAME) + val mergedProperties = if (properties.hasNamedConnection()) { + properties.merge(AZURE_ACCOUNT_NAME) + } else { + properties + } + + val accountName = mergedProperties.getString(AZURE_ACCOUNT_NAME) + if (mergedProperties.containsKey(AZURE_SAS_TOKEN)) { + val sasToken = mergedProperties.getString(AZURE_SAS_TOKEN) + val containerName = mergedProperties.getString(AZURE_CONTAINER_NAME) conf.set(s"fs.azure.sas.$containerName.$accountName.blob.core.windows.net", sasToken) } else { - val secretKey = properties.getString(AZURE_SECRET_KEY) + val secretKey = mergedProperties.getString(AZURE_SECRET_KEY) conf.set(s"fs.azure.account.key.$accountName.blob.core.windows.net", secretKey) } diff --git a/src/main/scala/com/exasol/cloudetl/bucket/SecureBucket.scala b/src/main/scala/com/exasol/cloudetl/bucket/SecureBucket.scala index 3424e3b1..aaef1270 100644 --- a/src/main/scala/com/exasol/cloudetl/bucket/SecureBucket.scala +++ b/src/main/scala/com/exasol/cloudetl/bucket/SecureBucket.scala @@ -28,8 +28,8 @@ trait SecureBucket extends LazyLogging { self: Bucket => ) } val connectionExceptionMessage = - "Please provide either only CONNECTION_NAME property or secure access " + - "credential property pairs, but not the both!" + "Please provide either CONNECTION_NAME property or secure access " + + "credentials parameters, but not the both!" if (properties.hasNamedConnection()) { if (hasSecureProperties()) { throw new IllegalArgumentException(connectionExceptionMessage) diff --git a/src/main/scala/com/exasol/cloudetl/storage/StorageProperties.scala b/src/main/scala/com/exasol/cloudetl/storage/StorageProperties.scala index d7cc36f9..0be412c5 100644 --- a/src/main/scala/com/exasol/cloudetl/storage/StorageProperties.scala +++ b/src/main/scala/com/exasol/cloudetl/storage/StorageProperties.scala @@ -86,11 +86,19 @@ class StorageProperties( val connection = getConnectionInformation() val username = connection.getUser() val password = connection.getPassword(); - val map = password.split(";").map { str => - val idx = str.indexOf('=') - str.substring(0, idx) -> str.substring(idx + 1) + val map = password + .split(";") + .map { str => + val idx = str.indexOf('=') + str.substring(0, idx) -> str.substring(idx + 1) + } + .toMap + + if (username.isEmpty()) { + map + } else { + Map(keyForUsername -> username) ++ map } - Map(keyForUsername -> username) ++ map } /** diff --git a/src/test/scala/com/exasol/cloudetl/bucket/AbstractBucketTest.scala b/src/test/scala/com/exasol/cloudetl/bucket/AbstractBucketTest.scala index d4a70652..a38f0ae1 100644 --- a/src/test/scala/com/exasol/cloudetl/bucket/AbstractBucketTest.scala +++ b/src/test/scala/com/exasol/cloudetl/bucket/AbstractBucketTest.scala @@ -1,13 +1,16 @@ package com.exasol.cloudetl.bucket +import com.exasol.ExaConnectionInformation import com.exasol.ExaMetadata import com.exasol.cloudetl.storage.StorageProperties +import org.mockito.Mockito.when import org.scalatest.BeforeAndAfterEach import org.scalatest.FunSuite +import org.scalatest.mockito.MockitoSugar @SuppressWarnings(Array("org.wartremover.warts.Overloading")) -class AbstractBucketTest extends FunSuite with BeforeAndAfterEach { +class AbstractBucketTest extends FunSuite with BeforeAndAfterEach with MockitoSugar { private[bucket] val PATH: String = "BUCKET_PATH" private[bucket] val FORMAT: String = "DATA_FORMAT" @@ -26,4 +29,20 @@ class AbstractBucketTest extends FunSuite with BeforeAndAfterEach { exaMetadata: ExaMetadata ): Bucket = Bucket(StorageProperties(params, exaMetadata)) + + protected[this] final def mockConnectionInfo( + username: String, + password: String + ): ExaMetadata = { + val metadata = mock[ExaMetadata] + val connectionInfo: ExaConnectionInformation = new ExaConnectionInformation() { + override def getType(): ExaConnectionInformation.ConnectionType = + ExaConnectionInformation.ConnectionType.PASSWORD + override def getAddress(): String = "" + override def getUser(): String = username + override def getPassword(): String = password + } + when(metadata.getConnection("connection_info")).thenReturn(connectionInfo) + metadata + } } diff --git a/src/test/scala/com/exasol/cloudetl/bucket/AzureAdlsBucketTest.scala b/src/test/scala/com/exasol/cloudetl/bucket/AzureAdlsBucketTest.scala new file mode 100644 index 00000000..6ef68b8e --- /dev/null +++ b/src/test/scala/com/exasol/cloudetl/bucket/AzureAdlsBucketTest.scala @@ -0,0 +1,77 @@ +package com.exasol.cloudetl.bucket + +@SuppressWarnings(Array("org.wartremover.warts.IsInstanceOf")) +class AzureAdlsBucketTest extends AbstractBucketTest { + + private[this] val defaultProperties = Map( + PATH -> "adl://container1.azuredatalakestore.net/avro-data/*", + FORMAT -> "AVRO" + ) + + private[this] val clientID = "clientID" + private[this] val clientSecret = "clientSecret" + private[this] val directoryID = "directoryID" + + private[this] val configMappings = Map( + "dfs.adls.oauth2.client.id" -> clientID, + "dfs.adls.oauth2.credential" -> clientSecret, + "dfs.adls.oauth2.refresh.url" -> s"https://login.microsoftonline.com/$directoryID/oauth2/token" + ) + + private[this] def assertAzureAdlsBucket( + bucket: Bucket, + extraMappings: Map[String, String] + ): Unit = { + assert(bucket.isInstanceOf[AzureAdlsBucket]) + val conf = bucket.getConfiguration() + val defaultMappings = Map( + "fs.adl.impl" -> classOf[org.apache.hadoop.fs.adl.AdlFileSystem].getName, + "fs.AbstractFileSystem.adl.impl" -> classOf[org.apache.hadoop.fs.adl.Adl].getName, + "dfs.adls.oauth2.access.token.provider.type" -> "ClientCredential" + ) + (defaultMappings ++ extraMappings).foreach { + case (given, expected) => + assert(conf.get(given) === expected) + } + } + + test("apply throws if no connection name or credentials is provided") { + properties = defaultProperties + val thrown = intercept[IllegalArgumentException] { + assertAzureAdlsBucket(getBucket(properties), Map.empty[String, String]) + } + val expected = "Please provide either CONNECTION_NAME property or secure access " + + "credentials parameters, but not the both!" + assert(thrown.getMessage === expected) + } + + test("apply returns AzureAdlsBucket with client id, client secret and directory id") { + properties = defaultProperties ++ Map( + "AZURE_CLIENT_ID" -> clientID, + "AZURE_CLIENT_SECRET" -> clientSecret, + "AZURE_DIRECTORY_ID" -> directoryID + ) + val bucket = getBucket(properties) + assertAzureAdlsBucket(bucket, configMappings) + } + + test("apply returns with credentails from username and password of connection object") { + properties = defaultProperties ++ Map("CONNECTION_NAME" -> "connection_info") + val exaMetadata = mockConnectionInfo( + clientID, + s"AZURE_CLIENT_SECRET=$clientSecret;AZURE_DIRECTORY_ID=$directoryID" + ) + val bucket = getBucket(properties, exaMetadata) + assertAzureAdlsBucket(bucket, configMappings) + } + + test("apply returns with credentails from password of connection object") { + properties = defaultProperties ++ Map("CONNECTION_NAME" -> "connection_info") + val connectionInfoPassword = s"AZURE_CLIENT_ID=$clientID;" + + s"AZURE_CLIENT_SECRET=$clientSecret;AZURE_DIRECTORY_ID=$directoryID" + val exaMetadata = mockConnectionInfo("", connectionInfoPassword) + val bucket = getBucket(properties, exaMetadata) + assertAzureAdlsBucket(bucket, configMappings) + } + +} diff --git a/src/test/scala/com/exasol/cloudetl/bucket/AzureBlobBucketTest.scala b/src/test/scala/com/exasol/cloudetl/bucket/AzureBlobBucketTest.scala index 49d4fbb4..5d4fa4d7 100644 --- a/src/test/scala/com/exasol/cloudetl/bucket/AzureBlobBucketTest.scala +++ b/src/test/scala/com/exasol/cloudetl/bucket/AzureBlobBucketTest.scala @@ -7,44 +7,46 @@ import org.apache.hadoop.fs.azure.Wasbs @SuppressWarnings(Array("org.wartremover.warts.IsInstanceOf")) class AzureBlobBucketTest extends AbstractBucketTest { - private[this] def assertAzureBlobBucket(bucket: Bucket): Unit = { + private[this] val defaultProperties = Map( + PATH -> "wasbs://container@account1.windows.net/orc-data/", + FORMAT -> "ORC" + ) + + private[this] def assertAzureBlobBucket( + bucket: Bucket, + extraMappings: Map[String, String] + ): Unit = { assert(bucket.isInstanceOf[AzureBlobBucket]) val conf = bucket.getConfiguration() - val mappings = Map( + val defaultMappings = Map( "fs.azure" -> classOf[NativeAzureFileSystem].getName, "fs.wasb.impl" -> classOf[NativeAzureFileSystem].getName, "fs.wasbs.impl" -> classOf[NativeAzureFileSystem].getName, "fs.AbstractFileSystem.wasb.impl" -> classOf[Wasb].getName, "fs.AbstractFileSystem.wasbs.impl" -> classOf[Wasbs].getName ) - mappings.foreach { + (defaultMappings ++ extraMappings).foreach { case (given, expected) => assert(conf.get(given) === expected) } } - private[this] val defaultProperties = Map( - PATH -> "wasbs://container@account1/parquet-bucket/", - FORMAT -> "ORC" - ) - - test("getConfiguration throws if account name is not provided") { + test("apply throws if account name is not provided") { properties = defaultProperties val thrown = intercept[IllegalArgumentException] { - assertAzureBlobBucket(getBucket(properties)) + assertAzureBlobBucket(getBucket(properties), Map.empty[String, String]) } assert(thrown.getMessage === "Please provide a value for the AZURE_ACCOUNT_NAME property!") } - test("getConfiguration throws if neither secret key nor sas token account is provided") { + test("apply throws if no connection name or credential (secret key or sas token) is provided") { properties = defaultProperties ++ Map("AZURE_ACCOUNT_NAME" -> "account1") val thrown = intercept[IllegalArgumentException] { - assertAzureBlobBucket(getBucket(properties)) + assertAzureBlobBucket(getBucket(properties), Map.empty[String, String]) } - assert( - thrown.getMessage === "Please provide a value for either " + - "AZURE_SECRET_KEY or AZURE_SAS_TOKEN!" - ) + val expected = "Please provide either CONNECTION_NAME property or secure access " + + "credentials parameters, but not the both!" + assert(thrown.getMessage === expected) } test("apply returns AzureBlobBucket with secret key") { @@ -53,11 +55,9 @@ class AzureBlobBucketTest extends AbstractBucketTest { "AZURE_SECRET_KEY" -> "secret" ) val bucket = getBucket(properties) - assertAzureBlobBucket(bucket) - assert( - bucket - .getConfiguration() - .get("fs.azure.account.key.account1.blob.core.windows.net") === "secret" + assertAzureBlobBucket( + bucket, + Map("fs.azure.account.key.account1.blob.core.windows.net" -> "secret") ) } @@ -67,7 +67,7 @@ class AzureBlobBucketTest extends AbstractBucketTest { "AZURE_SAS_TOKEN" -> "token" ) val thrown = intercept[IllegalArgumentException] { - assertAzureBlobBucket(getBucket(properties)) + assertAzureBlobBucket(getBucket(properties), Map.empty[String, String]) } assert(thrown.getMessage === "Please provide a value for the AZURE_CONTAINER_NAME property!") } @@ -79,11 +79,50 @@ class AzureBlobBucketTest extends AbstractBucketTest { "AZURE_CONTAINER_NAME" -> "container1" ) val bucket = getBucket(properties) - assertAzureBlobBucket(bucket) - assert( - bucket - .getConfiguration() - .get("fs.azure.sas.container1.account1.blob.core.windows.net") === "token" + assertAzureBlobBucket( + bucket, + Map("fs.azure.sas.container1.account1.blob.core.windows.net" -> "token") + ) + } + + test("apply returns secret from password of connection object") { + properties = defaultProperties ++ Map( + "AZURE_ACCOUNT_NAME" -> "account1", + "CONNECTION_NAME" -> "connection_info" + ) + val exaMetadata = mockConnectionInfo("", "AZURE_SECRET_KEY=secret") + val bucket = getBucket(properties, exaMetadata) + assertAzureBlobBucket( + bucket, + Map("fs.azure.account.key.account1.blob.core.windows.net" -> "secret") + ) + } + + test("apply returns sas token from password of connection object") { + properties = defaultProperties ++ Map( + "AZURE_ACCOUNT_NAME" -> "account1", + "AZURE_CONTAINER_NAME" -> "container1", + "CONNECTION_NAME" -> "connection_info" + ) + val exaMetadata = mockConnectionInfo("", "AZURE_SAS_TOKEN=token") + val bucket = getBucket(properties, exaMetadata) + assertAzureBlobBucket( + bucket, + Map("fs.azure.sas.container1.account1.blob.core.windows.net" -> "token") + ) + } + + test("apply returns sas from connection object if both sas and secret are provided") { + properties = defaultProperties ++ Map( + "AZURE_ACCOUNT_NAME" -> "account1", + "AZURE_CONTAINER_NAME" -> "container1", + "CONNECTION_NAME" -> "connection_info" + ) + val exaMetadata = mockConnectionInfo("", "AZURE_SECRET_KEY=secret;AZURE_SAS_TOKEN=token") + val bucket = getBucket(properties, exaMetadata) + assertAzureBlobBucket( + bucket, + Map("fs.azure.sas.container1.account1.blob.core.windows.net" -> "token") ) } diff --git a/src/test/scala/com/exasol/cloudetl/bucket/BucketTest.scala b/src/test/scala/com/exasol/cloudetl/bucket/BucketTest.scala index 63cc2a26..4388de39 100644 --- a/src/test/scala/com/exasol/cloudetl/bucket/BucketTest.scala +++ b/src/test/scala/com/exasol/cloudetl/bucket/BucketTest.scala @@ -35,31 +35,4 @@ class BucketTest extends AbstractBucketTest { assert(conf.get("fs.gs.auth.service.account.json.keyfile") === "/bucketfs/bucket1/projX.json") } - test("apply returns AzureAdlsBucket") { - properties = Map( - PATH -> "adl://my_container.azuredatalakestore.net/orc/*", - FORMAT -> "CSV", - "AZURE_CLIENT_ID" -> "clientX", - "AZURE_CLIENT_SECRET" -> "client_secret", - "AZURE_DIRECTORY_ID" -> "directory_id_secret" - ) - val bucket = getBucket(properties) - assert(bucket.isInstanceOf[AzureAdlsBucket]) - - val conf = bucket.getConfiguration() - val expectedSettings = Map( - "fs.adl.impl" -> classOf[org.apache.hadoop.fs.adl.AdlFileSystem].getName, - "fs.AbstractFileSystem.adl.impl" -> classOf[org.apache.hadoop.fs.adl.Adl].getName, - "dfs.adls.oauth2.access.token.provider.type" -> "ClientCredential", - "dfs.adls.oauth2.client.id" -> "clientX", - "dfs.adls.oauth2.credential" -> "client_secret", - "dfs.adls.oauth2.refresh.url" -> - "https://login.microsoftonline.com/directory_id_secret/oauth2/token" - ) - expectedSettings.foreach { - case (given, expected) => - assert(conf.get(given) === expected) - } - } - } diff --git a/src/test/scala/com/exasol/cloudetl/bucket/S3BucketTest.scala b/src/test/scala/com/exasol/cloudetl/bucket/S3BucketTest.scala index 35c4362b..99a19fec 100644 --- a/src/test/scala/com/exasol/cloudetl/bucket/S3BucketTest.scala +++ b/src/test/scala/com/exasol/cloudetl/bucket/S3BucketTest.scala @@ -1,14 +1,9 @@ package com.exasol.cloudetl.bucket -import com.exasol.ExaConnectionInformation -import com.exasol.ExaMetadata - import org.apache.hadoop.fs.s3a.S3AFileSystem -import org.mockito.Mockito.when -import org.scalatest.mockito.MockitoSugar @SuppressWarnings(Array("org.wartremover.warts.IsInstanceOf")) -class S3BucketTest extends AbstractBucketTest with MockitoSugar { +class S3BucketTest extends AbstractBucketTest { private[this] val defaultProperties = Map( PATH -> "s3a://my-bucket/", @@ -45,6 +40,16 @@ class S3BucketTest extends AbstractBucketTest with MockitoSugar { } } + test("apply throws when no secrets nor connection name is provided") { + properties = defaultProperties + val thrown = intercept[IllegalArgumentException] { + assertS3Bucket(getBucket(properties), Map.empty[String, String]) + } + val expected = "Please provide either CONNECTION_NAME property or secure access " + + "credentials parameters, but not the both!" + assert(thrown.getMessage === expected) + } + test("apply returns S3Bucket with access and secret parameters") { properties = accessProperties - "S3_SESSION_TOKEN" val bucket = getBucket(properties) @@ -95,17 +100,4 @@ class S3BucketTest extends AbstractBucketTest with MockitoSugar { assertS3Bucket(bucket, configMappings) } - private[this] final def mockConnectionInfo(username: String, password: String): ExaMetadata = { - val metadata = mock[ExaMetadata] - val connectionInfo: ExaConnectionInformation = new ExaConnectionInformation() { - override def getType(): ExaConnectionInformation.ConnectionType = - ExaConnectionInformation.ConnectionType.PASSWORD - override def getAddress(): String = "" - override def getUser(): String = username - override def getPassword(): String = password - } - when(metadata.getConnection("connection_info")).thenReturn(connectionInfo) - metadata - } - } diff --git a/src/test/scala/com/exasol/cloudetl/bucket/SecureBucketTest.scala b/src/test/scala/com/exasol/cloudetl/bucket/SecureBucketTest.scala index 9e9018ff..ce547ff4 100644 --- a/src/test/scala/com/exasol/cloudetl/bucket/SecureBucketTest.scala +++ b/src/test/scala/com/exasol/cloudetl/bucket/SecureBucketTest.scala @@ -11,7 +11,7 @@ class SecureBucketTest extends AbstractBucketTest { val thrown = intercept[IllegalArgumentException] { BaseSecureBucket(properties).validate() } - assert(thrown.getMessage.contains("Please provide either only CONNECTION_NAME property or")) + assert(thrown.getMessage.contains("Please provide either CONNECTION_NAME property or")) } test("validate throws if both connection name and access properties are provided") { @@ -23,8 +23,8 @@ class SecureBucketTest extends AbstractBucketTest { val thrown = intercept[IllegalArgumentException] { BaseSecureBucket(properties).validate() } - assert(thrown.getMessage.contains("Please provide either only CONNECTION_NAME property or")) - assert(thrown.getMessage.contains("property pairs, but not the both!")) + assert(thrown.getMessage.contains("Please provide either CONNECTION_NAME property or")) + assert(thrown.getMessage.contains("secure access credentials parameters, but not the both!")) } @SuppressWarnings(Array("org.wartremover.warts.DefaultArguments")) From 0693471b04920df24e957218c878dddb699c1006 Mon Sep 17 00:00:00 2001 From: Muhammet Orazov Date: Thu, 5 Dec 2019 23:40:02 +0100 Subject: [PATCH 4/4] Update docs to include connection object usage. --- docs/storage/cloud_storages.md | 269 ++++++++++++++++++++++++++------- 1 file changed, 216 insertions(+), 53 deletions(-) diff --git a/docs/storage/cloud_storages.md b/docs/storage/cloud_storages.md index 47c56bf0..280cd3d0 100644 --- a/docs/storage/cloud_storages.md +++ b/docs/storage/cloud_storages.md @@ -7,6 +7,7 @@ formats. ## Table of contents - [Prerequisites](#prerequisites) +- [Exasol Named Connection Object](#exasol-named-connection-object) - [Amazon S3](#amazon-s3) - [Google Cloud Storage](#google-cloud-storage) - [Azure Blob Storage](#azure-blob-storage) @@ -22,8 +23,15 @@ guide](../deployment_guide.md) if you have not done so. Additionally, you can read the [user guide](../user_guide.md) in order to get familiar with cloud-storage-etl-udfs in general. -In this guide, we will be using `RETAIL` schema and `SALES_POSITIONS` table in -examples. +## Exasol Named Connection Object + +In this guide, we will be providing examples using `RETAIL` schema and +`SALES_POSITIONS` table. + +Moreover, we are going to show how to provide secure access credentials using +UDF parameters and [Exasol named connection object][exa-connection]. We highly +**recommend** to use connection objects to provide credentials to UDFs so that +secrets are not displayed in audit logs. ## Amazon S3 @@ -41,6 +49,9 @@ access key and secret key. - `S3_ACCESS_KEY` - `S3_SECRET_KEY` +However, when using Multi Factor Authentication (MFA), you can also provide +additional session token using `S3_SESSION_TOKEN` parameter. + Please follow the [Amazon credentials management best practices][aws-creds] when creating credentials. @@ -52,25 +63,77 @@ An endpoint is the URL of the entry point for an AWS resource. For example, `https://dynamodb.us-west-2.amazonaws.com` is the endpoint for the Amazon DynamoDB service in the US West (Oregon) Region. -### Import from S3 +### Using connection object + +First of all create a named connection object and encode credentials a key-value +pairs separated by semicolon (`;`). + +Using AWS access and secret keys: + +```sql +CREATE OR REPLACE CONNECTION S3_CONNECTION +TO '' +USER '' +IDENTIFIED BY 'S3_ACCESS_KEY=;S3_SECRET_KEY='; +``` + +Or together with session token: + +```sql +CREATE OR REPLACE CONNECTION S3_CONNECTION +TO '' +USER '' +IDENTIFIED BY 'S3_ACCESS_KEY=;S3_SECRET_KEY=;S3_SESSION_TOKEN='; +``` + +#### Import ```sql IMPORT INTO RETAIL.SALES_POSITIONS FROM SCRIPT ETL.IMPORT_PATH WITH - BUCKET_PATH = 's3a:///data/orc/sales_positions/*' - DATA_FORMAT = 'ORC' - S3_ACCESS_KEY = '' - S3_SECRET_KEY = '' - S3_ENDPOINT = 's3..amazonaws.com' - PARALLELISM = 'nproc()*2'; + BUCKET_PATH = 's3a:///import/orc/sales_positions/*' + DATA_FORMAT = 'ORC' + S3_ENDPOINT = 's3..amazonaws.com' + CONNECTION_NAME = 'S3_CONNECTION' + PARALLELISM = 'nproc()*2'; ``` -### Export to S3 +#### Export ```sql EXPORT RETAIL.SALES_POSITIONS INTO SCRIPT ETL.EXPORT_PATH WITH - BUCKET_PATH = 's3a:///data/parquet/sales_positions/' + BUCKET_PATH = 's3a:///export/parquet/sales_positions/' + DATA_FORMAT = 'PARQUET' + S3_ENDPOINT = 's3..amazonaws.com' + CONNECTION_NAME = 'S3_CONNECTION' + PARALLELISM = 'iproc(), floor(random()*2)'; +``` + +### Using UDF Parameters + +In this case, you should provide each access credentials as key-value pairs. + +#### Import + +```sql +IMPORT INTO RETAIL.SALES_POSITIONS +FROM SCRIPT ETL.IMPORT_PATH WITH + BUCKET_PATH = 's3a:///import/orc/sales_positions/*' + DATA_FORMAT = 'ORC' + S3_ACCESS_KEY = '' + S3_SECRET_KEY = '' + S3_SESSION_TOKEN = '' + S3_ENDPOINT = 's3..amazonaws.com' + PARALLELISM = 'nproc()*2'; +``` + +#### Export + +```sql +EXPORT RETAIL.SALES_POSITIONS +INTO SCRIPT ETL.EXPORT_PATH WITH + BUCKET_PATH = 's3a:///export/parquet/sales_positions/' DATA_FORMAT = 'PARQUET' S3_ACCESS_KEY = '' S3_SECRET_KEY = '' @@ -123,24 +186,26 @@ curl -X PUT -T gcp--service-keyfile.json \ Please make sure that the bucket is **secure** and only **readable by users** who run the UDFs. -### Import from GCS +### Using UDF Parameters + +#### Import from GCS ```sql IMPORT INTO RETAIL.SALES_POSITIONS FROM SCRIPT ETL.IMPORT_PATH WITH - BUCKET_PATH = 'gs:///data/avro/sales_positions/*' + BUCKET_PATH = 'gs:///import/avro/sales_positions/*' DATA_FORMAT = 'AVRO' GCS_PROJECT_ID = '' GCS_KEYFILE_PATH = '/buckets/bfsdefault//gcp--service-keyfile.json' PARALLELISM = 'nproc()*4'; ``` -### Export to GCS +#### Export to GCS ```sql EXPORT RETAIL.SALES_POSITIONS INTO SCRIPT ETL.EXPORT_PATH WITH - BUCKET_PATH = 'gs:///data/parquet/sales_positions/' + BUCKET_PATH = 'gs:///export/parquet/sales_positions/' DATA_FORMAT = 'PARQUET' GCS_PROJECT_ID = '' GCS_KEYFILE_PATH = '/buckets/bfsdefault//gcp--service-keyfile.json' @@ -157,8 +222,8 @@ learn more about Exasol synchronous cluster filesystem BucketFS. ## Azure Blob Storage -Azure Blob Storage containers can be accessed using two possible authotization -mechanism. +Azure Blob Storage containers can be accessed using two possible authorization +mechanisms. - ``AZURE_SECRET_KEY`` - ``AZURE_SAS_TOKEN`` @@ -167,69 +232,130 @@ The **AZURE_SECRET_KEY** is 512-bit storage access keys that can be generated after creating a storage account. It is used to authorize access to the storage accounts. -THE **AZURE_SAS_TOKEN** is Shared Access Signature (SAS) that provides secure +The **AZURE_SAS_TOKEN** is a Shared Access Signature (SAS) that provides secure access to storage account with granular control over how the clients can access the data. You should provider either one of these parameters when using cloud-storage-elt-udfs to access the Azure Blob Storage containers. -Additionally, you need to obtain the Azure Blob store account name and container -name and provide them as UDF parameters. - -- ``AZURE_ACCOUNT_NAME`` -- ``AZURE_CONTAINER_NAME`` - -The **AZURE_CONTAINER_NAME** parameter is optional if you are using storage -account access keys. However, it should still be available in the -``BUCKET_PATH`` property value string. - Please refer to Azure documentation on [creating storage account][azure-blob-account], managing [storage access keys][azure-blob-keys] and using [shared access signatures (SAS)][azure-blob-sas]. -### Import from Blob Storage using access key +### Using connection object + +Create a named connection for using with Azure secret key: + +```sql +CREATE OR REPLACE CONNECTION AZURE_BLOB_SECRET_CONNECTION +TO '' +USER '' +IDENTIFIED BY 'AZURE_SECRET_KEY='; +``` + +Or for using with Azure SAS token: + +```sql +CREATE OR REPLACE CONNECTION AZURE_BLOB_SAS_CONNECTION +TO '' +USER '' +IDENTIFIED BY 'AZURE_SAS_TOKEN='; +``` + +#### Import using secret key connection object ```sql IMPORT INTO RETAIL.SALES_POSITIONS FROM SCRIPT ETL.IMPORT_PATH WITH - BUCKET_PATH = 'wasbs://@.blob.core.windows.net/data/orc/sales-positions/*' - DATA_FORMAT = 'ORC' - AZURE_ACCOUNT_NAME = '' - AZURE_SECRET_KEY = '' - PARALLELISM = 'nproc()'; + BUCKET_PATH = 'wasbs://@.blob.core.windows.net/import/orc/*' + DATA_FORMAT = 'ORC' + CONNECTION_NAME = 'AZURE_BLOB_SECRET_CONNECTION' + PARALLELISM = 'nproc()'; ``` -### Import from Blob Storage using SAS token +#### Import using SAS token connection object ```sql IMPORT INTO RETAIL.SALES_POSITIONS FROM SCRIPT ETL.IMPORT_PATH WITH - BUCKET_PATH = 'wasbs://@.blob.core.windows.net/data/orc/sales-positions/*' - DATA_FORMAT = 'ORC' - AZURE_ACCOUNT_NAME = '' - AZURE_CONTAINER_NAME = '' - AZURE_SAS_TOKEN = '' - PARALLELISM = 'nproc()'; + BUCKET_PATH = 'wasbs://@.blob.core.windows.net/import/orc/*' + DATA_FORMAT = 'ORC' + CONNECTION_NAME = 'AZURE_BLOB_SAS_CONNECTION' + PARALLELISM = 'nproc()'; ``` -### Export to Blob Storage +#### Export using secret key connection object ```sql EXPORT RETAIL.SALES_POSITIONS INTO SCRIPT ETL.EXPORT_PATH WITH - BUCKET_PATH = 'wasbs://@.blob.core.windows.net/data/parquet/sales-positions/' - DATA_FORMAT = 'PARQUET' - AZURE_ACCOUNT_NAME = '' - AZURE_SECRET_KEY = '' - PARALLELISM = 'iproc()'; + BUCKET_PATH = 'wasbs://@.blob.core.windows.net/export/parquet/' + DATA_FORMAT = 'PARQUET' + CONNECTION_NAME = 'AZURE_BLOB_SECRET_CONNECTION' + PARALLELISM = 'iproc()'; ``` -Similar to import, you can also use the SAS token when exporting. +#### Export using SAS token connection object + +```sql +EXPORT RETAIL.SALES_POSITIONS +INTO SCRIPT ETL.EXPORT_PATH WITH + BUCKET_PATH = 'wasbs://@.blob.core.windows.net/export/parquet/' + DATA_FORMAT = 'PARQUET' + CONNECTION_NAME = 'AZURE_BLOB_SAS_CONNECTION' + PARALLELISM = 'iproc()'; +``` + +### Using UDF Parameters + +#### Import using secret key + +```sql +IMPORT INTO RETAIL.SALES_POSITIONS +FROM SCRIPT ETL.IMPORT_PATH WITH + BUCKET_PATH = 'wasbs://@.blob.core.windows.net/import/orc/*' + DATA_FORMAT = 'ORC' + AZURE_SECRET_KEY = '' + PARALLELISM = 'nproc()'; +``` + +#### Import using SAS token + +```sql +IMPORT INTO RETAIL.SALES_POSITIONS +FROM SCRIPT ETL.IMPORT_PATH WITH + BUCKET_PATH = 'wasbs://@.blob.core.windows.net/import/orc/*' + DATA_FORMAT = 'ORC' + AZURE_SAS_TOKEN = '' + PARALLELISM = 'nproc()'; +``` + +#### Export using secret key + +```sql +EXPORT RETAIL.SALES_POSITIONS +INTO SCRIPT ETL.EXPORT_PATH WITH + BUCKET_PATH = 'wasbs://@.blob.core.windows.net/export/parquet/' + DATA_FORMAT = 'PARQUET' + AZURE_SECRET_KEY = '' + PARALLELISM = 'iproc()'; +``` + +#### Export using SAS token + +```sql +EXPORT RETAIL.SALES_POSITIONS +INTO SCRIPT ETL.EXPORT_PATH WITH + BUCKET_PATH = 'wasbs://@.blob.core.windows.net/export/parquet/' + DATA_FORMAT = 'PARQUET' + AZURE_SAS_TOKEN = '' + PARALLELISM = 'iproc()'; +``` The Azure Blob Storage container path URI scheme can be `wasbs` or `wasb`. -## Azure Data Lake Storage +## Azure Data Lake (Gen1) Storage Currently only Azure Data Lake Storage Gen1 version is supported. @@ -255,12 +381,48 @@ documentation pages should show how obtain required configuration settings. Finally, make sure that the client id has an access permissions to the Gen1 storage container or its child directories. -### Import from Data Lake (Gen1) Storage +### Using connection object + +Create a named connection object that includes secure credentials for Azure ADLS +Storage in the identification field: + +```sql +CREATE OR REPLACE CONNECTION AZURE_ADLS_CONNECTION +TO '' +USER '' +IDENTIFIED BY 'AZURE_CLIENT_ID=;AZURE_CLIENT_SECRET=;AZURE_DIRECTORY_ID='; +``` + +#### Import + +```sql +IMPORT INTO RETAIL.SALES_POSITIONS +FROM SCRIPT ETL.IMPORT_PATH WITH + BUCKET_PATH = 'adl://.azuredatalakestore.net/import/avro/*' + DATA_FORMAT = 'AVRO' + CONNECTION_NAME = 'AZURE_ADLS_CONNECTION' + PARALLELISM = 'nproc()'; +``` + +#### Export + +```sql +EXPORT RETAIL.SALES_POSITIONS +INTO SCRIPT ETL.EXPORT_PATH WITH + BUCKET_PATH = 'adl://.azuredatalakestore.net/export/parquet/' + DATA_FORMAT = 'PARQUET' + CONNECTION_NAME = 'AZURE_ADLS_CONNECTION' + PARALLELISM = 'iproc()'; +``` + +### Using UDF Parameters + +#### Import ```sql IMPORT INTO RETAIL.SALES_POSITIONS FROM SCRIPT ETL.IMPORT_PATH WITH - BUCKET_PATH = 'adl://.azuredatalakestore.net/data/avro/sales_positions/*' + BUCKET_PATH = 'adl://.azuredatalakestore.net/import/avro/*' DATA_FORMAT = 'AVRO' AZURE_CLIENT_ID = '' AZURE_CLIENT_SECRET = '' @@ -268,12 +430,12 @@ FROM SCRIPT ETL.IMPORT_PATH WITH PARALLELISM = 'nproc()'; ``` -### Export to Data Lake (Gen1) Storage +#### Export ```sql EXPORT RETAIL.SALES_POSITIONS INTO SCRIPT ETL.EXPORT_PATH WITH - BUCKET_PATH = 'adl://.azuredatalakestore.net/data/parquet/sales_positions/' + BUCKET_PATH = 'adl://.azuredatalakestore.net/export/parquet/' DATA_FORMAT = 'PARQUET' AZURE_CLIENT_ID = '' AZURE_CLIENT_SECRET = '' @@ -283,6 +445,7 @@ INTO SCRIPT ETL.EXPORT_PATH WITH The container path should start with `adl` URI scheme. +[exa-connection]: https://docs.exasol.com/sql/create_connection.htm [aws-creds]: https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html [gcp-projects]: https://cloud.google.com/resource-manager/docs/creating-managing-projects [gcp-auth-intro]: https://cloud.google.com/compute/docs/access/service-accounts