diff --git a/CHANGES.md b/CHANGES.md index 43e719ce..93330d75 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,7 +2,7 @@ > 2019 OCT 31 :jack_o_lantern: -* Adds Apach Kafka consumer UDF to import Avro formatted data from Kafka +* Adds Apache Kafka consumer UDF to import Avro formatted data from Kafka clusters. [#40](https://github.com/exasol/cloud-storage-etl-udfs/issues/40) [#39](https://github.com/exasol/cloud-storage-etl-udfs/pull/39) [#48](https://github.com/exasol/cloud-storage-etl-udfs/pull/48) diff --git a/docs/storage/cloud_storages.md b/docs/storage/cloud_storages.md index 47c56bf0..280cd3d0 100644 --- a/docs/storage/cloud_storages.md +++ b/docs/storage/cloud_storages.md @@ -7,6 +7,7 @@ formats. ## Table of contents - [Prerequisites](#prerequisites) +- [Exasol Named Connection Object](#exasol-named-connection-object) - [Amazon S3](#amazon-s3) - [Google Cloud Storage](#google-cloud-storage) - [Azure Blob Storage](#azure-blob-storage) @@ -22,8 +23,15 @@ guide](../deployment_guide.md) if you have not done so. Additionally, you can read the [user guide](../user_guide.md) in order to get familiar with cloud-storage-etl-udfs in general. -In this guide, we will be using `RETAIL` schema and `SALES_POSITIONS` table in -examples. +## Exasol Named Connection Object + +In this guide, we will be providing examples using `RETAIL` schema and +`SALES_POSITIONS` table. + +Moreover, we are going to show how to provide secure access credentials using +UDF parameters and [Exasol named connection object][exa-connection]. We highly +**recommend** to use connection objects to provide credentials to UDFs so that +secrets are not displayed in audit logs. ## Amazon S3 @@ -41,6 +49,9 @@ access key and secret key. - `S3_ACCESS_KEY` - `S3_SECRET_KEY` +However, when using Multi Factor Authentication (MFA), you can also provide +additional session token using `S3_SESSION_TOKEN` parameter. + Please follow the [Amazon credentials management best practices][aws-creds] when creating credentials. @@ -52,25 +63,77 @@ An endpoint is the URL of the entry point for an AWS resource. For example, `https://dynamodb.us-west-2.amazonaws.com` is the endpoint for the Amazon DynamoDB service in the US West (Oregon) Region. -### Import from S3 +### Using connection object + +First of all create a named connection object and encode credentials a key-value +pairs separated by semicolon (`;`). + +Using AWS access and secret keys: + +```sql +CREATE OR REPLACE CONNECTION S3_CONNECTION +TO '' +USER '' +IDENTIFIED BY 'S3_ACCESS_KEY=;S3_SECRET_KEY='; +``` + +Or together with session token: + +```sql +CREATE OR REPLACE CONNECTION S3_CONNECTION +TO '' +USER '' +IDENTIFIED BY 'S3_ACCESS_KEY=;S3_SECRET_KEY=;S3_SESSION_TOKEN='; +``` + +#### Import ```sql IMPORT INTO RETAIL.SALES_POSITIONS FROM SCRIPT ETL.IMPORT_PATH WITH - BUCKET_PATH = 's3a:///data/orc/sales_positions/*' - DATA_FORMAT = 'ORC' - S3_ACCESS_KEY = '' - S3_SECRET_KEY = '' - S3_ENDPOINT = 's3..amazonaws.com' - PARALLELISM = 'nproc()*2'; + BUCKET_PATH = 's3a:///import/orc/sales_positions/*' + DATA_FORMAT = 'ORC' + S3_ENDPOINT = 's3..amazonaws.com' + CONNECTION_NAME = 'S3_CONNECTION' + PARALLELISM = 'nproc()*2'; ``` -### Export to S3 +#### Export ```sql EXPORT RETAIL.SALES_POSITIONS INTO SCRIPT ETL.EXPORT_PATH WITH - BUCKET_PATH = 's3a:///data/parquet/sales_positions/' + BUCKET_PATH = 's3a:///export/parquet/sales_positions/' + DATA_FORMAT = 'PARQUET' + S3_ENDPOINT = 's3..amazonaws.com' + CONNECTION_NAME = 'S3_CONNECTION' + PARALLELISM = 'iproc(), floor(random()*2)'; +``` + +### Using UDF Parameters + +In this case, you should provide each access credentials as key-value pairs. + +#### Import + +```sql +IMPORT INTO RETAIL.SALES_POSITIONS +FROM SCRIPT ETL.IMPORT_PATH WITH + BUCKET_PATH = 's3a:///import/orc/sales_positions/*' + DATA_FORMAT = 'ORC' + S3_ACCESS_KEY = '' + S3_SECRET_KEY = '' + S3_SESSION_TOKEN = '' + S3_ENDPOINT = 's3..amazonaws.com' + PARALLELISM = 'nproc()*2'; +``` + +#### Export + +```sql +EXPORT RETAIL.SALES_POSITIONS +INTO SCRIPT ETL.EXPORT_PATH WITH + BUCKET_PATH = 's3a:///export/parquet/sales_positions/' DATA_FORMAT = 'PARQUET' S3_ACCESS_KEY = '' S3_SECRET_KEY = '' @@ -123,24 +186,26 @@ curl -X PUT -T gcp--service-keyfile.json \ Please make sure that the bucket is **secure** and only **readable by users** who run the UDFs. -### Import from GCS +### Using UDF Parameters + +#### Import from GCS ```sql IMPORT INTO RETAIL.SALES_POSITIONS FROM SCRIPT ETL.IMPORT_PATH WITH - BUCKET_PATH = 'gs:///data/avro/sales_positions/*' + BUCKET_PATH = 'gs:///import/avro/sales_positions/*' DATA_FORMAT = 'AVRO' GCS_PROJECT_ID = '' GCS_KEYFILE_PATH = '/buckets/bfsdefault//gcp--service-keyfile.json' PARALLELISM = 'nproc()*4'; ``` -### Export to GCS +#### Export to GCS ```sql EXPORT RETAIL.SALES_POSITIONS INTO SCRIPT ETL.EXPORT_PATH WITH - BUCKET_PATH = 'gs:///data/parquet/sales_positions/' + BUCKET_PATH = 'gs:///export/parquet/sales_positions/' DATA_FORMAT = 'PARQUET' GCS_PROJECT_ID = '' GCS_KEYFILE_PATH = '/buckets/bfsdefault//gcp--service-keyfile.json' @@ -157,8 +222,8 @@ learn more about Exasol synchronous cluster filesystem BucketFS. ## Azure Blob Storage -Azure Blob Storage containers can be accessed using two possible authotization -mechanism. +Azure Blob Storage containers can be accessed using two possible authorization +mechanisms. - ``AZURE_SECRET_KEY`` - ``AZURE_SAS_TOKEN`` @@ -167,69 +232,130 @@ The **AZURE_SECRET_KEY** is 512-bit storage access keys that can be generated after creating a storage account. It is used to authorize access to the storage accounts. -THE **AZURE_SAS_TOKEN** is Shared Access Signature (SAS) that provides secure +The **AZURE_SAS_TOKEN** is a Shared Access Signature (SAS) that provides secure access to storage account with granular control over how the clients can access the data. You should provider either one of these parameters when using cloud-storage-elt-udfs to access the Azure Blob Storage containers. -Additionally, you need to obtain the Azure Blob store account name and container -name and provide them as UDF parameters. - -- ``AZURE_ACCOUNT_NAME`` -- ``AZURE_CONTAINER_NAME`` - -The **AZURE_CONTAINER_NAME** parameter is optional if you are using storage -account access keys. However, it should still be available in the -``BUCKET_PATH`` property value string. - Please refer to Azure documentation on [creating storage account][azure-blob-account], managing [storage access keys][azure-blob-keys] and using [shared access signatures (SAS)][azure-blob-sas]. -### Import from Blob Storage using access key +### Using connection object + +Create a named connection for using with Azure secret key: + +```sql +CREATE OR REPLACE CONNECTION AZURE_BLOB_SECRET_CONNECTION +TO '' +USER '' +IDENTIFIED BY 'AZURE_SECRET_KEY='; +``` + +Or for using with Azure SAS token: + +```sql +CREATE OR REPLACE CONNECTION AZURE_BLOB_SAS_CONNECTION +TO '' +USER '' +IDENTIFIED BY 'AZURE_SAS_TOKEN='; +``` + +#### Import using secret key connection object ```sql IMPORT INTO RETAIL.SALES_POSITIONS FROM SCRIPT ETL.IMPORT_PATH WITH - BUCKET_PATH = 'wasbs://@.blob.core.windows.net/data/orc/sales-positions/*' - DATA_FORMAT = 'ORC' - AZURE_ACCOUNT_NAME = '' - AZURE_SECRET_KEY = '' - PARALLELISM = 'nproc()'; + BUCKET_PATH = 'wasbs://@.blob.core.windows.net/import/orc/*' + DATA_FORMAT = 'ORC' + CONNECTION_NAME = 'AZURE_BLOB_SECRET_CONNECTION' + PARALLELISM = 'nproc()'; ``` -### Import from Blob Storage using SAS token +#### Import using SAS token connection object ```sql IMPORT INTO RETAIL.SALES_POSITIONS FROM SCRIPT ETL.IMPORT_PATH WITH - BUCKET_PATH = 'wasbs://@.blob.core.windows.net/data/orc/sales-positions/*' - DATA_FORMAT = 'ORC' - AZURE_ACCOUNT_NAME = '' - AZURE_CONTAINER_NAME = '' - AZURE_SAS_TOKEN = '' - PARALLELISM = 'nproc()'; + BUCKET_PATH = 'wasbs://@.blob.core.windows.net/import/orc/*' + DATA_FORMAT = 'ORC' + CONNECTION_NAME = 'AZURE_BLOB_SAS_CONNECTION' + PARALLELISM = 'nproc()'; ``` -### Export to Blob Storage +#### Export using secret key connection object ```sql EXPORT RETAIL.SALES_POSITIONS INTO SCRIPT ETL.EXPORT_PATH WITH - BUCKET_PATH = 'wasbs://@.blob.core.windows.net/data/parquet/sales-positions/' - DATA_FORMAT = 'PARQUET' - AZURE_ACCOUNT_NAME = '' - AZURE_SECRET_KEY = '' - PARALLELISM = 'iproc()'; + BUCKET_PATH = 'wasbs://@.blob.core.windows.net/export/parquet/' + DATA_FORMAT = 'PARQUET' + CONNECTION_NAME = 'AZURE_BLOB_SECRET_CONNECTION' + PARALLELISM = 'iproc()'; ``` -Similar to import, you can also use the SAS token when exporting. +#### Export using SAS token connection object + +```sql +EXPORT RETAIL.SALES_POSITIONS +INTO SCRIPT ETL.EXPORT_PATH WITH + BUCKET_PATH = 'wasbs://@.blob.core.windows.net/export/parquet/' + DATA_FORMAT = 'PARQUET' + CONNECTION_NAME = 'AZURE_BLOB_SAS_CONNECTION' + PARALLELISM = 'iproc()'; +``` + +### Using UDF Parameters + +#### Import using secret key + +```sql +IMPORT INTO RETAIL.SALES_POSITIONS +FROM SCRIPT ETL.IMPORT_PATH WITH + BUCKET_PATH = 'wasbs://@.blob.core.windows.net/import/orc/*' + DATA_FORMAT = 'ORC' + AZURE_SECRET_KEY = '' + PARALLELISM = 'nproc()'; +``` + +#### Import using SAS token + +```sql +IMPORT INTO RETAIL.SALES_POSITIONS +FROM SCRIPT ETL.IMPORT_PATH WITH + BUCKET_PATH = 'wasbs://@.blob.core.windows.net/import/orc/*' + DATA_FORMAT = 'ORC' + AZURE_SAS_TOKEN = '' + PARALLELISM = 'nproc()'; +``` + +#### Export using secret key + +```sql +EXPORT RETAIL.SALES_POSITIONS +INTO SCRIPT ETL.EXPORT_PATH WITH + BUCKET_PATH = 'wasbs://@.blob.core.windows.net/export/parquet/' + DATA_FORMAT = 'PARQUET' + AZURE_SECRET_KEY = '' + PARALLELISM = 'iproc()'; +``` + +#### Export using SAS token + +```sql +EXPORT RETAIL.SALES_POSITIONS +INTO SCRIPT ETL.EXPORT_PATH WITH + BUCKET_PATH = 'wasbs://@.blob.core.windows.net/export/parquet/' + DATA_FORMAT = 'PARQUET' + AZURE_SAS_TOKEN = '' + PARALLELISM = 'iproc()'; +``` The Azure Blob Storage container path URI scheme can be `wasbs` or `wasb`. -## Azure Data Lake Storage +## Azure Data Lake (Gen1) Storage Currently only Azure Data Lake Storage Gen1 version is supported. @@ -255,12 +381,48 @@ documentation pages should show how obtain required configuration settings. Finally, make sure that the client id has an access permissions to the Gen1 storage container or its child directories. -### Import from Data Lake (Gen1) Storage +### Using connection object + +Create a named connection object that includes secure credentials for Azure ADLS +Storage in the identification field: + +```sql +CREATE OR REPLACE CONNECTION AZURE_ADLS_CONNECTION +TO '' +USER '' +IDENTIFIED BY 'AZURE_CLIENT_ID=;AZURE_CLIENT_SECRET=;AZURE_DIRECTORY_ID='; +``` + +#### Import + +```sql +IMPORT INTO RETAIL.SALES_POSITIONS +FROM SCRIPT ETL.IMPORT_PATH WITH + BUCKET_PATH = 'adl://.azuredatalakestore.net/import/avro/*' + DATA_FORMAT = 'AVRO' + CONNECTION_NAME = 'AZURE_ADLS_CONNECTION' + PARALLELISM = 'nproc()'; +``` + +#### Export + +```sql +EXPORT RETAIL.SALES_POSITIONS +INTO SCRIPT ETL.EXPORT_PATH WITH + BUCKET_PATH = 'adl://.azuredatalakestore.net/export/parquet/' + DATA_FORMAT = 'PARQUET' + CONNECTION_NAME = 'AZURE_ADLS_CONNECTION' + PARALLELISM = 'iproc()'; +``` + +### Using UDF Parameters + +#### Import ```sql IMPORT INTO RETAIL.SALES_POSITIONS FROM SCRIPT ETL.IMPORT_PATH WITH - BUCKET_PATH = 'adl://.azuredatalakestore.net/data/avro/sales_positions/*' + BUCKET_PATH = 'adl://.azuredatalakestore.net/import/avro/*' DATA_FORMAT = 'AVRO' AZURE_CLIENT_ID = '' AZURE_CLIENT_SECRET = '' @@ -268,12 +430,12 @@ FROM SCRIPT ETL.IMPORT_PATH WITH PARALLELISM = 'nproc()'; ``` -### Export to Data Lake (Gen1) Storage +#### Export ```sql EXPORT RETAIL.SALES_POSITIONS INTO SCRIPT ETL.EXPORT_PATH WITH - BUCKET_PATH = 'adl://.azuredatalakestore.net/data/parquet/sales_positions/' + BUCKET_PATH = 'adl://.azuredatalakestore.net/export/parquet/' DATA_FORMAT = 'PARQUET' AZURE_CLIENT_ID = '' AZURE_CLIENT_SECRET = '' @@ -283,6 +445,7 @@ INTO SCRIPT ETL.EXPORT_PATH WITH The container path should start with `adl` URI scheme. +[exa-connection]: https://docs.exasol.com/sql/create_connection.htm [aws-creds]: https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html [gcp-projects]: https://cloud.google.com/resource-manager/docs/creating-managing-projects [gcp-auth-intro]: https://cloud.google.com/compute/docs/access/service-accounts diff --git a/src/main/scala/com/exasol/cloudetl/bucket/AzureAdlsBucket.scala b/src/main/scala/com/exasol/cloudetl/bucket/AzureAdlsBucket.scala index 7dc8ccb9..5b0076af 100644 --- a/src/main/scala/com/exasol/cloudetl/bucket/AzureAdlsBucket.scala +++ b/src/main/scala/com/exasol/cloudetl/bucket/AzureAdlsBucket.scala @@ -5,7 +5,9 @@ import com.exasol.cloudetl.storage.StorageProperties import org.apache.hadoop.conf.Configuration /** A [[Bucket]] implementation for the Azure Data Lake Storage */ -final case class AzureAdlsBucket(path: String, params: StorageProperties) extends Bucket { +final case class AzureAdlsBucket(path: String, params: StorageProperties) + extends Bucket + with SecureBucket { private[this] val AZURE_CLIENT_ID: String = "AZURE_CLIENT_ID" private[this] val AZURE_CLIENT_SECRET: String = "AZURE_CLIENT_SECRET" @@ -22,8 +24,18 @@ final case class AzureAdlsBucket(path: String, params: StorageProperties) extend * Storage. */ override def getRequiredProperties(): Seq[String] = + Seq.empty[String] + + /** @inheritdoc */ + override def getSecureProperties(): Seq[String] = Seq(AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, AZURE_DIRECTORY_ID) + /** @inheritdoc */ + override def validate(): Unit = { + validateRequiredProperties() + validateConnectionProperties() + } + /** * @inheritdoc * @@ -34,9 +46,14 @@ final case class AzureAdlsBucket(path: String, params: StorageProperties) extend validate() val conf = new Configuration() - val clientId = properties.getString(AZURE_CLIENT_ID) - val clientSecret = properties.getString(AZURE_CLIENT_SECRET) - val directoryId = properties.getString(AZURE_DIRECTORY_ID) + val mergedProperties = if (properties.hasNamedConnection()) { + properties.merge(AZURE_CLIENT_ID) + } else { + properties + } + val clientId = mergedProperties.getString(AZURE_CLIENT_ID) + val clientSecret = mergedProperties.getString(AZURE_CLIENT_SECRET) + val directoryId = mergedProperties.getString(AZURE_DIRECTORY_ID) val tokenEndpoint = s"https://login.microsoftonline.com/$directoryId/oauth2/token" conf.set("fs.adl.impl", classOf[org.apache.hadoop.fs.adl.AdlFileSystem].getName) conf.set("fs.AbstractFileSystem.adl.impl", classOf[org.apache.hadoop.fs.adl.Adl].getName) diff --git a/src/main/scala/com/exasol/cloudetl/bucket/AzureBlobBucket.scala b/src/main/scala/com/exasol/cloudetl/bucket/AzureBlobBucket.scala index 098254d7..9a99401c 100644 --- a/src/main/scala/com/exasol/cloudetl/bucket/AzureBlobBucket.scala +++ b/src/main/scala/com/exasol/cloudetl/bucket/AzureBlobBucket.scala @@ -5,7 +5,9 @@ import com.exasol.cloudetl.storage.StorageProperties import org.apache.hadoop.conf.Configuration /** A [[Bucket]] implementation for the Azure Blob Storage */ -final case class AzureBlobBucket(path: String, params: StorageProperties) extends Bucket { +final case class AzureBlobBucket(path: String, params: StorageProperties) + extends Bucket + with SecureBucket { private[this] val AZURE_ACCOUNT_NAME: String = "AZURE_ACCOUNT_NAME" private[this] val AZURE_CONTAINER_NAME: String = "AZURE_CONTAINER_NAME" @@ -24,20 +26,14 @@ final case class AzureBlobBucket(path: String, params: StorageProperties) extend override def getRequiredProperties(): Seq[String] = Seq(AZURE_ACCOUNT_NAME) - /** - * Validation method specific to the Azure Blob Storage. - * - * Validates required properties by calling parent {@code validate} - * method. Furthermore, checks whether either [[AZURE_SECRET_KEY]] or - * [[AZURE_SAS_TOKEN]] parameters are available. - */ - private[this] def validateExtra(): Unit = { - validate() - if (!properties.containsKey(AZURE_SECRET_KEY) && !properties.containsKey(AZURE_SAS_TOKEN)) { - throw new IllegalArgumentException( - s"Please provide a value for either $AZURE_SECRET_KEY or $AZURE_SAS_TOKEN!" - ) - } + /** @inheritdoc */ + override def getSecureProperties(): Seq[String] = + Seq(AZURE_SECRET_KEY, AZURE_SAS_TOKEN) + + /** @inheritdoc */ + override def validate(): Unit = { + validateRequiredProperties() + validateConnectionProperties() } /** @@ -47,7 +43,7 @@ final case class AzureBlobBucket(path: String, params: StorageProperties) extend * in order to create a configuration. */ override def getConfiguration(): Configuration = { - validateExtra() + validate() val conf = new Configuration() conf.set("fs.azure", classOf[org.apache.hadoop.fs.azure.NativeAzureFileSystem].getName) @@ -59,13 +55,19 @@ final case class AzureBlobBucket(path: String, params: StorageProperties) extend classOf[org.apache.hadoop.fs.azure.Wasbs].getName ) - val accountName = properties.getString(AZURE_ACCOUNT_NAME) - if (properties.containsKey(AZURE_SAS_TOKEN)) { - val sasToken = properties.getString(AZURE_SAS_TOKEN) - val containerName = properties.getString(AZURE_CONTAINER_NAME) + val mergedProperties = if (properties.hasNamedConnection()) { + properties.merge(AZURE_ACCOUNT_NAME) + } else { + properties + } + + val accountName = mergedProperties.getString(AZURE_ACCOUNT_NAME) + if (mergedProperties.containsKey(AZURE_SAS_TOKEN)) { + val sasToken = mergedProperties.getString(AZURE_SAS_TOKEN) + val containerName = mergedProperties.getString(AZURE_CONTAINER_NAME) conf.set(s"fs.azure.sas.$containerName.$accountName.blob.core.windows.net", sasToken) } else { - val secretKey = properties.getString(AZURE_SECRET_KEY) + val secretKey = mergedProperties.getString(AZURE_SECRET_KEY) conf.set(s"fs.azure.account.key.$accountName.blob.core.windows.net", secretKey) } diff --git a/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala b/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala index 2d311458..4f2ff4a3 100644 --- a/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala +++ b/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala @@ -20,7 +20,7 @@ import org.apache.hadoop.fs.Path * * All specific implementation of a bucket should extend this class. */ -abstract class Bucket { +abstract class Bucket extends LazyLogging { /** The path string of the bucket. */ val bucketPath: String @@ -34,11 +34,19 @@ abstract class Bucket { */ def getRequiredProperties(): Seq[String] - /** Validates that all required parameter key values are available. */ - final def validate(): Unit = - validateRequiredProperties() + /** + * Creates a Hadoop [[org.apache.hadoop.conf.Configuration]] for this + * specific bucket type. + */ + def getConfiguration(): Configuration - private[this] def validateRequiredProperties(): Unit = + /** + * Validates that user provided key-value properties are available for + * this bucket implementation. + */ + def validate(): Unit + + protected[this] final def validateRequiredProperties(): Unit = getRequiredProperties().foreach { key => if (!properties.containsKey(key)) { throw new IllegalArgumentException( @@ -47,12 +55,6 @@ abstract class Bucket { } } - /** - * Creates a Hadoop [[org.apache.hadoop.conf.Configuration]] for this - * specific bucket type. - */ - def getConfiguration(): Configuration - /** * The Hadoop [[org.apache.hadoop.fs.FileSystem]] for this specific * bucket path. @@ -101,14 +103,4 @@ object Bucket extends LazyLogging { } } - /** - * Creates specific [[Bucket]] class using the path scheme from - * key-value properties. - * - * @param params The key value parameters - * @return A [[Bucket]] class for the given path - */ - def apply(params: Map[String, String]): Bucket = - apply(StorageProperties(params)) - } diff --git a/src/main/scala/com/exasol/cloudetl/bucket/GCSBucket.scala b/src/main/scala/com/exasol/cloudetl/bucket/GCSBucket.scala index d85d107e..649ffdc7 100644 --- a/src/main/scala/com/exasol/cloudetl/bucket/GCSBucket.scala +++ b/src/main/scala/com/exasol/cloudetl/bucket/GCSBucket.scala @@ -16,6 +16,10 @@ final case class GCSBucket(path: String, params: StorageProperties) extends Buck /** @inheritdoc */ override val properties: StorageProperties = params + /** @inheritdoc */ + override def validate(): Unit = + validateRequiredProperties() + /** * Returns the list of required property keys for Google Cloud * Storage. diff --git a/src/main/scala/com/exasol/cloudetl/bucket/LocalBucket.scala b/src/main/scala/com/exasol/cloudetl/bucket/LocalBucket.scala index 5f9e6b5f..541d2524 100644 --- a/src/main/scala/com/exasol/cloudetl/bucket/LocalBucket.scala +++ b/src/main/scala/com/exasol/cloudetl/bucket/LocalBucket.scala @@ -18,6 +18,10 @@ final case class LocalBucket(path: String, params: StorageProperties) extends Bu /** @inheritdoc */ override def getRequiredProperties(): Seq[String] = Seq.empty[String] + /** @inheritdoc */ + override def validate(): Unit = + validateRequiredProperties() + /** @inheritdoc */ override def getConfiguration(): Configuration = { validate() diff --git a/src/main/scala/com/exasol/cloudetl/bucket/S3Bucket.scala b/src/main/scala/com/exasol/cloudetl/bucket/S3Bucket.scala index 0e8c8185..114173ae 100644 --- a/src/main/scala/com/exasol/cloudetl/bucket/S3Bucket.scala +++ b/src/main/scala/com/exasol/cloudetl/bucket/S3Bucket.scala @@ -5,11 +5,14 @@ import com.exasol.cloudetl.storage.StorageProperties import org.apache.hadoop.conf.Configuration /** A [[Bucket]] implementation for the AWS S3 */ -final case class S3Bucket(path: String, params: StorageProperties) extends Bucket { +final case class S3Bucket(path: String, params: StorageProperties) + extends Bucket + with SecureBucket { private[this] val S3_ENDPOINT: String = "S3_ENDPOINT" private[this] val S3_ACCESS_KEY: String = "S3_ACCESS_KEY" private[this] val S3_SECRET_KEY: String = "S3_SECRET_KEY" + private[this] val S3_SESSION_TOKEN: String = "S3_SESSION_TOKEN" /** @inheritdoc */ override val bucketPath: String = path @@ -18,8 +21,16 @@ final case class S3Bucket(path: String, params: StorageProperties) extends Bucke override val properties: StorageProperties = params /** Returns the list of required property keys for AWS S3 Storage. */ - override def getRequiredProperties(): Seq[String] = - Seq(S3_ENDPOINT, S3_ACCESS_KEY, S3_SECRET_KEY) + override def getRequiredProperties(): Seq[String] = Seq(S3_ENDPOINT) + + /** @inheritdoc */ + override def getSecureProperties(): Seq[String] = Seq(S3_SECRET_KEY, S3_SESSION_TOKEN) + + /** @inheritdoc */ + override def validate(): Unit = { + validateRequiredProperties() + validateConnectionProperties() + } /** * @inheritdoc @@ -34,8 +45,23 @@ final case class S3Bucket(path: String, params: StorageProperties) extends Bucke conf.set("fs.file.impl", classOf[org.apache.hadoop.fs.LocalFileSystem].getName) conf.set("fs.s3a.impl", classOf[org.apache.hadoop.fs.s3a.S3AFileSystem].getName) conf.set("fs.s3a.endpoint", properties.getString(S3_ENDPOINT)) - conf.set("fs.s3a.access.key", properties.getString(S3_ACCESS_KEY)) - conf.set("fs.s3a.secret.key", properties.getString(S3_SECRET_KEY)) + + val mergedProperties = if (properties.hasNamedConnection()) { + properties.merge(S3_ACCESS_KEY) + } else { + properties + } + + conf.set("fs.s3a.access.key", mergedProperties.getString(S3_ACCESS_KEY)) + conf.set("fs.s3a.secret.key", mergedProperties.getString(S3_SECRET_KEY)) + + if (mergedProperties.containsKey(S3_SESSION_TOKEN)) { + conf.set( + "fs.s3a.aws.credentials.provider", + classOf[org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider].getName + ) + conf.set("fs.s3a.session.token", mergedProperties.getString(S3_SESSION_TOKEN)) + } conf } diff --git a/src/main/scala/com/exasol/cloudetl/bucket/SecureBucket.scala b/src/main/scala/com/exasol/cloudetl/bucket/SecureBucket.scala new file mode 100644 index 00000000..aaef1270 --- /dev/null +++ b/src/main/scala/com/exasol/cloudetl/bucket/SecureBucket.scala @@ -0,0 +1,47 @@ +package com.exasol.cloudetl.bucket + +import com.typesafe.scalalogging.LazyLogging + +/** + * A trait that provides methods to access [[Bucket]]s using secure + * access credentials. + */ +trait SecureBucket extends LazyLogging { self: Bucket => + + /** + * Return the list of property key names that are used as secure + * access credentials. + * + * For example, {@code AWS_SECRET_KEY} when accessing an S3 bucket. + */ + def getSecureProperties(): Seq[String] + + /** + * Validates that the named connection object or access credentials + * are available. + */ + protected[this] final def validateConnectionProperties(): Unit = { + if (hasSecureProperties()) { + logger.info( + "Using secure credential parameters is deprecated. " + + "Please use an Exasol named connection object via CONNECTION_NAME property." + ) + } + val connectionExceptionMessage = + "Please provide either CONNECTION_NAME property or secure access " + + "credentials parameters, but not the both!" + if (properties.hasNamedConnection()) { + if (hasSecureProperties()) { + throw new IllegalArgumentException(connectionExceptionMessage) + } + } else { + if (!hasSecureProperties()) { + throw new IllegalArgumentException(connectionExceptionMessage) + } + } + } + + private[this] def hasSecureProperties(): Boolean = + getSecureProperties.exists(properties.containsKey(_)) + +} diff --git a/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportTable.scala b/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportTable.scala index ab2da7cc..e9906907 100644 --- a/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportTable.scala +++ b/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportTable.scala @@ -17,7 +17,7 @@ import com.typesafe.scalalogging.LazyLogging object ExportTable extends LazyLogging { def run(metadata: ExaMetadata, iterator: ExaIterator): Unit = { - val storageProperties = StorageProperties(iterator.getString(1)) + val storageProperties = StorageProperties(iterator.getString(1), metadata) val bucket = Bucket(storageProperties) val srcColumnNames = iterator.getString(2).split("\\.") val firstColumnIdx = 3 diff --git a/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportFiles.scala b/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportFiles.scala index 7392caae..f8196a0a 100644 --- a/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportFiles.scala +++ b/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportFiles.scala @@ -15,7 +15,7 @@ import org.apache.hadoop.fs.Path object ImportFiles extends LazyLogging { def run(metadata: ExaMetadata, iterator: ExaIterator): Unit = { - val storageProperties = StorageProperties(iterator.getString(1)) + val storageProperties = StorageProperties(iterator.getString(1), metadata) val fileFormat = storageProperties.getFileFormat() val bucket = Bucket(storageProperties) diff --git a/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportMetadata.scala b/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportMetadata.scala index 41c6f178..7ea08759 100644 --- a/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportMetadata.scala +++ b/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportMetadata.scala @@ -17,7 +17,7 @@ object ImportMetadata extends LazyLogging { + s"with parallelism: ${parallelism.toString}" ) - val storageProperties = StorageProperties(iterator.getString(1)) + val storageProperties = StorageProperties(iterator.getString(1), metadata) val bucket = Bucket(storageProperties) val paths = bucket.getPaths().filter(p => !p.getName().startsWith("_")) logger.info(s"Total number of files: ${paths.size} in bucket path: $bucketPath") diff --git a/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportPath.scala b/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportPath.scala index 470026ad..762dad3e 100644 --- a/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportPath.scala +++ b/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportPath.scala @@ -17,18 +17,18 @@ object ImportPath { val bucket = Bucket(storageProperties) bucket.validate() + val scriptSchema = metadata.getScriptSchema val bucketPath = bucket.bucketPath val parallelism = storageProperties.getParallelism("nproc()") - val storagePropertiesStr = storageProperties.mkString() - val scriptSchema = metadata.getScriptSchema + val storagePropertiesAsString = storageProperties.mkString() s"""SELECT | $scriptSchema.IMPORT_FILES( - | '$bucketPath', '$storagePropertiesStr', filename + | '$bucketPath', '$storagePropertiesAsString', filename |) |FROM ( | SELECT $scriptSchema.IMPORT_METADATA( - | '$bucketPath', '$storagePropertiesStr', $parallelism + | '$bucketPath', '$storagePropertiesAsString', $parallelism | ) |) |GROUP BY diff --git a/src/main/scala/com/exasol/cloudetl/storage/StorageProperties.scala b/src/main/scala/com/exasol/cloudetl/storage/StorageProperties.scala index a7a79c20..0be412c5 100644 --- a/src/main/scala/com/exasol/cloudetl/storage/StorageProperties.scala +++ b/src/main/scala/com/exasol/cloudetl/storage/StorageProperties.scala @@ -2,6 +2,8 @@ package com.exasol.cloudetl.storage import java.net.URI +import com.exasol.ExaConnectionInformation +import com.exasol.ExaMetadata import com.exasol.cloudetl.common.AbstractProperties import com.exasol.cloudetl.common.CommonProperties @@ -11,8 +13,10 @@ import com.exasol.cloudetl.common.CommonProperties * provided key-value parameters for storage import and export * user-defined-functions (udfs). */ -class StorageProperties(private val properties: Map[String, String]) - extends AbstractProperties(properties) { +class StorageProperties( + private val properties: Map[String, String], + private val exaMetadata: Option[ExaMetadata] +) extends AbstractProperties(properties) { import StorageProperties._ @@ -46,6 +50,57 @@ class StorageProperties(private val properties: Map[String, String]) final def getParallelism(defaultValue: => String): String = get(PARALLELISM).fold(defaultValue)(identity) + /** + * Returns an Exasol [[ExaConnectionInformation]] named connection + * information. + */ + @SuppressWarnings(Array("org.wartremover.warts.AsInstanceOf")) + final def getConnectionInformation(): ExaConnectionInformation = + exaMetadata.fold { + throw new IllegalArgumentException("Exasol metadata is None!") + }(_.getConnection(getString(CONNECTION_NAME))) + + /** Checks if the Exasol named connection property is provided. */ + final def hasNamedConnection(): Boolean = + containsKey(CONNECTION_NAME) + + /** + * Returns a new [[StorageProperties]] what merges the key-value pairs + * parsed from user provided Exasol named connection object. + */ + final def merge(accountName: String): StorageProperties = { + val connectionParsedMap = parseConnectionInfo(accountName) + val newProperties = properties ++ connectionParsedMap + new StorageProperties(newProperties, exaMetadata) + } + + /** + * Parses the connection object password into key-value map pairs. + * + * If the connection object contains the username, it is mapped to the + * {@code keyForUsername} parameter. However, this value is + * overwritted if the provided key is available in password string of + * connection object. + */ + private[this] def parseConnectionInfo(keyForUsername: String): Map[String, String] = { + val connection = getConnectionInformation() + val username = connection.getUser() + val password = connection.getPassword(); + val map = password + .split(";") + .map { str => + val idx = str.indexOf('=') + str.substring(0, idx) -> str.substring(idx + 1) + } + .toMap + + if (username.isEmpty()) { + map + } else { + Map(keyForUsername -> username) ++ map + } + } + /** * Returns a string value of key-value property pairs. * @@ -72,9 +127,26 @@ object StorageProperties extends CommonProperties { /** An optional property key name for the parallelism. */ private[storage] final val PARALLELISM: String = "PARALLELISM" - /** Returns [[StorageProperties]] from key-value pairs map. */ + /** An optional property key name for the named connection object. */ + private[storage] final val CONNECTION_NAME: String = "CONNECTION_NAME" + + /** + * Returns [[StorageProperties]] from key values map and + * [[ExaMetadata]] metadata object. + */ + def apply(params: Map[String, String], metadata: ExaMetadata): StorageProperties = + new StorageProperties(params, Option(metadata)) + + /** Returns [[StorageProperties]] from only key-value pairs map. */ def apply(params: Map[String, String]): StorageProperties = - new StorageProperties(params) + new StorageProperties(params, None) + + /** + * Returns [[StorageProperties]] from properly separated string and + * [[ExaMetadata]] metadata object. + */ + def apply(string: String, metadata: ExaMetadata): StorageProperties = + apply(mapFromString(string), metadata) /** Returns [[StorageProperties]] from properly separated string. */ def apply(string: String): StorageProperties = diff --git a/src/test/scala/com/exasol/cloudetl/bucket/AbstractBucketTest.scala b/src/test/scala/com/exasol/cloudetl/bucket/AbstractBucketTest.scala index bfffe58c..a38f0ae1 100644 --- a/src/test/scala/com/exasol/cloudetl/bucket/AbstractBucketTest.scala +++ b/src/test/scala/com/exasol/cloudetl/bucket/AbstractBucketTest.scala @@ -1,9 +1,16 @@ package com.exasol.cloudetl.bucket +import com.exasol.ExaConnectionInformation +import com.exasol.ExaMetadata +import com.exasol.cloudetl.storage.StorageProperties + +import org.mockito.Mockito.when import org.scalatest.BeforeAndAfterEach import org.scalatest.FunSuite +import org.scalatest.mockito.MockitoSugar -class AbstractBucketTest extends FunSuite with BeforeAndAfterEach { +@SuppressWarnings(Array("org.wartremover.warts.Overloading")) +class AbstractBucketTest extends FunSuite with BeforeAndAfterEach with MockitoSugar { private[bucket] val PATH: String = "BUCKET_PATH" private[bucket] val FORMAT: String = "DATA_FORMAT" @@ -14,4 +21,28 @@ class AbstractBucketTest extends FunSuite with BeforeAndAfterEach { () } + protected[this] final def getBucket(params: Map[String, String]): Bucket = + Bucket(StorageProperties(params)) + + protected[this] final def getBucket( + params: Map[String, String], + exaMetadata: ExaMetadata + ): Bucket = + Bucket(StorageProperties(params, exaMetadata)) + + protected[this] final def mockConnectionInfo( + username: String, + password: String + ): ExaMetadata = { + val metadata = mock[ExaMetadata] + val connectionInfo: ExaConnectionInformation = new ExaConnectionInformation() { + override def getType(): ExaConnectionInformation.ConnectionType = + ExaConnectionInformation.ConnectionType.PASSWORD + override def getAddress(): String = "" + override def getUser(): String = username + override def getPassword(): String = password + } + when(metadata.getConnection("connection_info")).thenReturn(connectionInfo) + metadata + } } diff --git a/src/test/scala/com/exasol/cloudetl/bucket/AzureAdlsBucketTest.scala b/src/test/scala/com/exasol/cloudetl/bucket/AzureAdlsBucketTest.scala new file mode 100644 index 00000000..6ef68b8e --- /dev/null +++ b/src/test/scala/com/exasol/cloudetl/bucket/AzureAdlsBucketTest.scala @@ -0,0 +1,77 @@ +package com.exasol.cloudetl.bucket + +@SuppressWarnings(Array("org.wartremover.warts.IsInstanceOf")) +class AzureAdlsBucketTest extends AbstractBucketTest { + + private[this] val defaultProperties = Map( + PATH -> "adl://container1.azuredatalakestore.net/avro-data/*", + FORMAT -> "AVRO" + ) + + private[this] val clientID = "clientID" + private[this] val clientSecret = "clientSecret" + private[this] val directoryID = "directoryID" + + private[this] val configMappings = Map( + "dfs.adls.oauth2.client.id" -> clientID, + "dfs.adls.oauth2.credential" -> clientSecret, + "dfs.adls.oauth2.refresh.url" -> s"https://login.microsoftonline.com/$directoryID/oauth2/token" + ) + + private[this] def assertAzureAdlsBucket( + bucket: Bucket, + extraMappings: Map[String, String] + ): Unit = { + assert(bucket.isInstanceOf[AzureAdlsBucket]) + val conf = bucket.getConfiguration() + val defaultMappings = Map( + "fs.adl.impl" -> classOf[org.apache.hadoop.fs.adl.AdlFileSystem].getName, + "fs.AbstractFileSystem.adl.impl" -> classOf[org.apache.hadoop.fs.adl.Adl].getName, + "dfs.adls.oauth2.access.token.provider.type" -> "ClientCredential" + ) + (defaultMappings ++ extraMappings).foreach { + case (given, expected) => + assert(conf.get(given) === expected) + } + } + + test("apply throws if no connection name or credentials is provided") { + properties = defaultProperties + val thrown = intercept[IllegalArgumentException] { + assertAzureAdlsBucket(getBucket(properties), Map.empty[String, String]) + } + val expected = "Please provide either CONNECTION_NAME property or secure access " + + "credentials parameters, but not the both!" + assert(thrown.getMessage === expected) + } + + test("apply returns AzureAdlsBucket with client id, client secret and directory id") { + properties = defaultProperties ++ Map( + "AZURE_CLIENT_ID" -> clientID, + "AZURE_CLIENT_SECRET" -> clientSecret, + "AZURE_DIRECTORY_ID" -> directoryID + ) + val bucket = getBucket(properties) + assertAzureAdlsBucket(bucket, configMappings) + } + + test("apply returns with credentails from username and password of connection object") { + properties = defaultProperties ++ Map("CONNECTION_NAME" -> "connection_info") + val exaMetadata = mockConnectionInfo( + clientID, + s"AZURE_CLIENT_SECRET=$clientSecret;AZURE_DIRECTORY_ID=$directoryID" + ) + val bucket = getBucket(properties, exaMetadata) + assertAzureAdlsBucket(bucket, configMappings) + } + + test("apply returns with credentails from password of connection object") { + properties = defaultProperties ++ Map("CONNECTION_NAME" -> "connection_info") + val connectionInfoPassword = s"AZURE_CLIENT_ID=$clientID;" + + s"AZURE_CLIENT_SECRET=$clientSecret;AZURE_DIRECTORY_ID=$directoryID" + val exaMetadata = mockConnectionInfo("", connectionInfoPassword) + val bucket = getBucket(properties, exaMetadata) + assertAzureAdlsBucket(bucket, configMappings) + } + +} diff --git a/src/test/scala/com/exasol/cloudetl/bucket/AzureBlobBucketTest.scala b/src/test/scala/com/exasol/cloudetl/bucket/AzureBlobBucketTest.scala index 27f9c601..5d4fa4d7 100644 --- a/src/test/scala/com/exasol/cloudetl/bucket/AzureBlobBucketTest.scala +++ b/src/test/scala/com/exasol/cloudetl/bucket/AzureBlobBucketTest.scala @@ -7,44 +7,46 @@ import org.apache.hadoop.fs.azure.Wasbs @SuppressWarnings(Array("org.wartremover.warts.IsInstanceOf")) class AzureBlobBucketTest extends AbstractBucketTest { - private[this] def assertAzureBlobBucket(bucket: Bucket): Unit = { + private[this] val defaultProperties = Map( + PATH -> "wasbs://container@account1.windows.net/orc-data/", + FORMAT -> "ORC" + ) + + private[this] def assertAzureBlobBucket( + bucket: Bucket, + extraMappings: Map[String, String] + ): Unit = { assert(bucket.isInstanceOf[AzureBlobBucket]) val conf = bucket.getConfiguration() - val mappings = Map( + val defaultMappings = Map( "fs.azure" -> classOf[NativeAzureFileSystem].getName, "fs.wasb.impl" -> classOf[NativeAzureFileSystem].getName, "fs.wasbs.impl" -> classOf[NativeAzureFileSystem].getName, "fs.AbstractFileSystem.wasb.impl" -> classOf[Wasb].getName, "fs.AbstractFileSystem.wasbs.impl" -> classOf[Wasbs].getName ) - mappings.foreach { + (defaultMappings ++ extraMappings).foreach { case (given, expected) => assert(conf.get(given) === expected) } } - private[this] val defaultProperties = Map( - PATH -> "wasbs://container@account1/parquet-bucket/", - FORMAT -> "ORC" - ) - - test("getConfiguration throws if account name is not provided") { + test("apply throws if account name is not provided") { properties = defaultProperties val thrown = intercept[IllegalArgumentException] { - assertAzureBlobBucket(Bucket(properties)) + assertAzureBlobBucket(getBucket(properties), Map.empty[String, String]) } assert(thrown.getMessage === "Please provide a value for the AZURE_ACCOUNT_NAME property!") } - test("getConfiguration throws if neither secret key nor sas tokan account is provided") { + test("apply throws if no connection name or credential (secret key or sas token) is provided") { properties = defaultProperties ++ Map("AZURE_ACCOUNT_NAME" -> "account1") val thrown = intercept[IllegalArgumentException] { - assertAzureBlobBucket(Bucket(properties)) + assertAzureBlobBucket(getBucket(properties), Map.empty[String, String]) } - assert( - thrown.getMessage === "Please provide a value for either " + - "AZURE_SECRET_KEY or AZURE_SAS_TOKEN!" - ) + val expected = "Please provide either CONNECTION_NAME property or secure access " + + "credentials parameters, but not the both!" + assert(thrown.getMessage === expected) } test("apply returns AzureBlobBucket with secret key") { @@ -52,12 +54,10 @@ class AzureBlobBucketTest extends AbstractBucketTest { "AZURE_ACCOUNT_NAME" -> "account1", "AZURE_SECRET_KEY" -> "secret" ) - val bucket = Bucket(properties) - assertAzureBlobBucket(bucket) - assert( - bucket - .getConfiguration() - .get("fs.azure.account.key.account1.blob.core.windows.net") === "secret" + val bucket = getBucket(properties) + assertAzureBlobBucket( + bucket, + Map("fs.azure.account.key.account1.blob.core.windows.net" -> "secret") ) } @@ -67,7 +67,7 @@ class AzureBlobBucketTest extends AbstractBucketTest { "AZURE_SAS_TOKEN" -> "token" ) val thrown = intercept[IllegalArgumentException] { - assertAzureBlobBucket(Bucket(properties)) + assertAzureBlobBucket(getBucket(properties), Map.empty[String, String]) } assert(thrown.getMessage === "Please provide a value for the AZURE_CONTAINER_NAME property!") } @@ -78,12 +78,51 @@ class AzureBlobBucketTest extends AbstractBucketTest { "AZURE_SAS_TOKEN" -> "token", "AZURE_CONTAINER_NAME" -> "container1" ) - val bucket = Bucket(properties) - assertAzureBlobBucket(bucket) - assert( - bucket - .getConfiguration() - .get("fs.azure.sas.container1.account1.blob.core.windows.net") === "token" + val bucket = getBucket(properties) + assertAzureBlobBucket( + bucket, + Map("fs.azure.sas.container1.account1.blob.core.windows.net" -> "token") + ) + } + + test("apply returns secret from password of connection object") { + properties = defaultProperties ++ Map( + "AZURE_ACCOUNT_NAME" -> "account1", + "CONNECTION_NAME" -> "connection_info" + ) + val exaMetadata = mockConnectionInfo("", "AZURE_SECRET_KEY=secret") + val bucket = getBucket(properties, exaMetadata) + assertAzureBlobBucket( + bucket, + Map("fs.azure.account.key.account1.blob.core.windows.net" -> "secret") + ) + } + + test("apply returns sas token from password of connection object") { + properties = defaultProperties ++ Map( + "AZURE_ACCOUNT_NAME" -> "account1", + "AZURE_CONTAINER_NAME" -> "container1", + "CONNECTION_NAME" -> "connection_info" + ) + val exaMetadata = mockConnectionInfo("", "AZURE_SAS_TOKEN=token") + val bucket = getBucket(properties, exaMetadata) + assertAzureBlobBucket( + bucket, + Map("fs.azure.sas.container1.account1.blob.core.windows.net" -> "token") + ) + } + + test("apply returns sas from connection object if both sas and secret are provided") { + properties = defaultProperties ++ Map( + "AZURE_ACCOUNT_NAME" -> "account1", + "AZURE_CONTAINER_NAME" -> "container1", + "CONNECTION_NAME" -> "connection_info" + ) + val exaMetadata = mockConnectionInfo("", "AZURE_SECRET_KEY=secret;AZURE_SAS_TOKEN=token") + val bucket = getBucket(properties, exaMetadata) + assertAzureBlobBucket( + bucket, + Map("fs.azure.sas.container1.account1.blob.core.windows.net" -> "token") ) } diff --git a/src/test/scala/com/exasol/cloudetl/bucket/BucketTest.scala b/src/test/scala/com/exasol/cloudetl/bucket/BucketTest.scala index 8d9132f6..4388de39 100644 --- a/src/test/scala/com/exasol/cloudetl/bucket/BucketTest.scala +++ b/src/test/scala/com/exasol/cloudetl/bucket/BucketTest.scala @@ -1,7 +1,6 @@ package com.exasol.cloudetl.bucket import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem -import org.apache.hadoop.fs.s3a.S3AFileSystem @SuppressWarnings(Array("org.wartremover.warts.IsInstanceOf")) class BucketTest extends AbstractBucketTest { @@ -9,35 +8,17 @@ class BucketTest extends AbstractBucketTest { test("apply throws if the scheme is not supported") { properties = Map(PATH -> "xyz:/bucket/files*", FORMAT -> "ORC") val thrown = intercept[IllegalArgumentException] { - Bucket(properties) + getBucket(properties) } assert(thrown.getMessage === "Unsupported path scheme xyz!") } test("apply returns LocalBucket") { properties = Map(PATH -> "file://local/path/bucket/", FORMAT -> "ORC") - val bucket = Bucket(properties) + val bucket = getBucket(properties) assert(bucket.isInstanceOf[LocalBucket]) } - test("apply returns S3Bucket") { - properties = Map( - PATH -> "s3a://my-bucket/", - FORMAT -> "ORC", - "S3_ENDPOINT" -> "eu-central-1", - "S3_ACCESS_KEY" -> "abc", - "S3_SECRET_KEY" -> "xyz" - ) - val bucket = Bucket(properties) - val conf = bucket.getConfiguration() - - assert(bucket.isInstanceOf[S3Bucket]) - assert(conf.get("fs.s3a.impl") === classOf[S3AFileSystem].getName) - assert(conf.get("fs.s3a.endpoint") === "eu-central-1") - assert(conf.get("fs.s3a.access.key") === "abc") - assert(conf.get("fs.s3a.secret.key") === "xyz") - } - test("apply returns GCSBucket") { properties = Map( PATH -> "gs://my-bucket/", @@ -45,7 +26,7 @@ class BucketTest extends AbstractBucketTest { "GCS_PROJECT_ID" -> "projX", "GCS_KEYFILE_PATH" -> "/bucketfs/bucket1/projX.json" ) - val bucket = Bucket(properties) + val bucket = getBucket(properties) val conf = bucket.getConfiguration() assert(bucket.isInstanceOf[GCSBucket]) @@ -54,31 +35,4 @@ class BucketTest extends AbstractBucketTest { assert(conf.get("fs.gs.auth.service.account.json.keyfile") === "/bucketfs/bucket1/projX.json") } - test("apply returns AzureAdlsBucket") { - properties = Map( - PATH -> "adl://my_container.azuredatalakestore.net/orc/*", - FORMAT -> "CSV", - "AZURE_CLIENT_ID" -> "clientX", - "AZURE_CLIENT_SECRET" -> "client_secret", - "AZURE_DIRECTORY_ID" -> "directory_id_secret" - ) - val bucket = Bucket(properties) - assert(bucket.isInstanceOf[AzureAdlsBucket]) - - val conf = bucket.getConfiguration() - val expectedSettings = Map( - "fs.adl.impl" -> classOf[org.apache.hadoop.fs.adl.AdlFileSystem].getName, - "fs.AbstractFileSystem.adl.impl" -> classOf[org.apache.hadoop.fs.adl.Adl].getName, - "dfs.adls.oauth2.access.token.provider.type" -> "ClientCredential", - "dfs.adls.oauth2.client.id" -> "clientX", - "dfs.adls.oauth2.credential" -> "client_secret", - "dfs.adls.oauth2.refresh.url" -> - "https://login.microsoftonline.com/directory_id_secret/oauth2/token" - ) - expectedSettings.foreach { - case (given, expected) => - assert(conf.get(given) === expected) - } - } - } diff --git a/src/test/scala/com/exasol/cloudetl/bucket/S3BucketTest.scala b/src/test/scala/com/exasol/cloudetl/bucket/S3BucketTest.scala new file mode 100644 index 00000000..99a19fec --- /dev/null +++ b/src/test/scala/com/exasol/cloudetl/bucket/S3BucketTest.scala @@ -0,0 +1,103 @@ +package com.exasol.cloudetl.bucket + +import org.apache.hadoop.fs.s3a.S3AFileSystem + +@SuppressWarnings(Array("org.wartremover.warts.IsInstanceOf")) +class S3BucketTest extends AbstractBucketTest { + + private[this] val defaultProperties = Map( + PATH -> "s3a://my-bucket/", + FORMAT -> "AVRO", + "S3_ENDPOINT" -> "eu-central-1" + ) + + private[this] val accessKey = "access" + private[this] val secretKey = "secret" + private[this] val sessionToken = "token" + + private[this] val accessProperties = defaultProperties ++ Map( + "S3_ACCESS_KEY" -> accessKey, + "S3_SECRET_KEY" -> secretKey, + "S3_SESSION_TOKEN" -> sessionToken + ) + + private[this] val configMappings = Map( + "fs.s3a.access.key" -> accessKey, + "fs.s3a.secret.key" -> secretKey, + "fs.s3a.session.token" -> sessionToken + ) + + private[this] def assertS3Bucket(bucket: Bucket, extraMappings: Map[String, String]): Unit = { + assert(bucket.isInstanceOf[S3Bucket]) + val conf = bucket.getConfiguration() + val defaultMappings = Map( + "fs.s3a.impl" -> classOf[S3AFileSystem].getName, + "fs.s3a.endpoint" -> "eu-central-1" + ) + (defaultMappings ++ extraMappings).foreach { + case (given, expected) => + assert(conf.get(given) === expected) + } + } + + test("apply throws when no secrets nor connection name is provided") { + properties = defaultProperties + val thrown = intercept[IllegalArgumentException] { + assertS3Bucket(getBucket(properties), Map.empty[String, String]) + } + val expected = "Please provide either CONNECTION_NAME property or secure access " + + "credentials parameters, but not the both!" + assert(thrown.getMessage === expected) + } + + test("apply returns S3Bucket with access and secret parameters") { + properties = accessProperties - "S3_SESSION_TOKEN" + val bucket = getBucket(properties) + assertS3Bucket(bucket, configMappings - "fs.s3a.session.token") + } + + test("apply returns S3Bucket with access, secret and session token parameters") { + properties = accessProperties + val bucket = getBucket(properties) + assertS3Bucket(bucket, configMappings) + } + + test("apply returns S3Bucket with secret from connection") { + properties = defaultProperties ++ Map( + "CONNECTION_NAME" -> "connection_info" + ) + val exaMetadata = mockConnectionInfo("access", "S3_SECRET_KEY=secret") + val bucket = getBucket(properties, exaMetadata) + assertS3Bucket(bucket, configMappings - "fs.s3a.session.token") + } + + test("apply returns S3Bucket with secret and session token from connection") { + properties = defaultProperties ++ Map( + "CONNECTION_NAME" -> "connection_info" + ) + val exaMetadata = mockConnectionInfo("access", "S3_SECRET_KEY=secret;S3_SESSION_TOKEN=token") + val bucket = getBucket(properties, exaMetadata) + assertS3Bucket(bucket, configMappings) + } + + // Access key is encoded in password value of connection object. + test("apply returns S3Bucket with access and secret from connection") { + properties = defaultProperties ++ Map( + "CONNECTION_NAME" -> "connection_info" + ) + val exaMetadata = mockConnectionInfo("", "S3_ACCESS_KEY=access;S3_SECRET_KEY=secret") + val bucket = getBucket(properties, exaMetadata) + assertS3Bucket(bucket, configMappings - "fs.s3a.session.token") + } + + test("apply returns S3Bucket with access, secret and session token from connection") { + properties = defaultProperties ++ Map( + "CONNECTION_NAME" -> "connection_info" + ) + val exaMetadata = + mockConnectionInfo("", "S3_ACCESS_KEY=access;S3_SECRET_KEY=secret;S3_SESSION_TOKEN=token") + val bucket = getBucket(properties, exaMetadata) + assertS3Bucket(bucket, configMappings) + } + +} diff --git a/src/test/scala/com/exasol/cloudetl/bucket/SecureBucketTest.scala b/src/test/scala/com/exasol/cloudetl/bucket/SecureBucketTest.scala new file mode 100644 index 00000000..ce547ff4 --- /dev/null +++ b/src/test/scala/com/exasol/cloudetl/bucket/SecureBucketTest.scala @@ -0,0 +1,47 @@ +package com.exasol.cloudetl.bucket + +import com.exasol.ExaMetadata +import com.exasol.cloudetl.storage.StorageProperties + +import org.apache.hadoop.conf.Configuration + +class SecureBucketTest extends AbstractBucketTest { + + test("validate throws if no access credentials are provided") { + val thrown = intercept[IllegalArgumentException] { + BaseSecureBucket(properties).validate() + } + assert(thrown.getMessage.contains("Please provide either CONNECTION_NAME property or")) + } + + test("validate throws if both connection name and access properties are provided") { + properties = Map( + "CONNECTION_NAME" -> "named_connection", + "accountNameProperty" -> "user", + "accountSecretProperty" -> "secret" + ) + val thrown = intercept[IllegalArgumentException] { + BaseSecureBucket(properties).validate() + } + assert(thrown.getMessage.contains("Please provide either CONNECTION_NAME property or")) + assert(thrown.getMessage.contains("secure access credentials parameters, but not the both!")) + } + + @SuppressWarnings(Array("org.wartremover.warts.DefaultArguments")) + private[this] case class BaseSecureBucket( + val params: Map[String, String], + val metadata: ExaMetadata = null + ) extends Bucket + with SecureBucket { + override val properties = StorageProperties(params, metadata) + + override val bucketPath = "local_path" + override def getRequiredProperties: Seq[String] = Seq.empty[String] + override def getSecureProperties: Seq[String] = Seq("accountSecretProperty") + override def getConfiguration: Configuration = new Configuration() + override def validate(): Unit = { + validateRequiredProperties() + validateConnectionProperties() + } + } +} diff --git a/src/test/scala/com/exasol/cloudetl/scriptclasses/ExportPathTest.scala b/src/test/scala/com/exasol/cloudetl/scriptclasses/ExportPathTest.scala index 2307d95d..fa417e66 100644 --- a/src/test/scala/com/exasol/cloudetl/scriptclasses/ExportPathTest.scala +++ b/src/test/scala/com/exasol/cloudetl/scriptclasses/ExportPathTest.scala @@ -36,14 +36,14 @@ class ExportPathTest extends PathTest { } test("generateSqlForExportSpec throws if required property is not set") { - val newProperties = properties - ("S3_ACCESS_KEY") + val newProperties = properties - ("S3_ENDPOINT") when(metadata.getScriptSchema()).thenReturn(schema) when(exportSpec.getParameters()).thenReturn(newProperties.asJava) val thrown = intercept[IllegalArgumentException] { ExportPath.generateSqlForExportSpec(metadata, exportSpec) } - assert(thrown.getMessage === "Please provide a value for the S3_ACCESS_KEY property!") + assert(thrown.getMessage === "Please provide a value for the S3_ENDPOINT property!") verify(exportSpec, times(1)).getParameters verify(exportSpec, never).getSourceColumnNames } diff --git a/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportPathTest.scala b/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportPathTest.scala index da29db6a..dc142b01 100644 --- a/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportPathTest.scala +++ b/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportPathTest.scala @@ -36,14 +36,14 @@ class ImportPathTest extends PathTest { } test("generateSqlForImportSpec throws if required property is not set") { - val newProperties = properties - ("S3_ACCESS_KEY") + val newProperties = properties - ("S3_ENDPOINT") when(metadata.getScriptSchema()).thenReturn(schema) when(importSpec.getParameters()).thenReturn(newProperties.asJava) val thrown = intercept[IllegalArgumentException] { ImportPath.generateSqlForImportSpec(metadata, importSpec) } - assert(thrown.getMessage === "Please provide a value for the S3_ACCESS_KEY property!") + assert(thrown.getMessage === "Please provide a value for the S3_ENDPOINT property!") verify(importSpec, times(1)).getParameters } diff --git a/src/test/scala/com/exasol/cloudetl/sink/BatchSizedSinkTest.scala b/src/test/scala/com/exasol/cloudetl/sink/BatchSizedSinkTest.scala index e439209b..423cc2ae 100644 --- a/src/test/scala/com/exasol/cloudetl/sink/BatchSizedSinkTest.scala +++ b/src/test/scala/com/exasol/cloudetl/sink/BatchSizedSinkTest.scala @@ -42,7 +42,7 @@ class BatchSizedSinkTest extends FunSuite with BeforeAndAfterEach with DummyReco test("export single file with default batch size") { val bucket = LocalBucket( outputPath.toUri.toString, - new StorageProperties(properties ++ Map("EXPORT_BATCH_SIZE" -> "4")) + StorageProperties(properties ++ Map("EXPORT_BATCH_SIZE" -> "4")) ) val sink = new BatchSizedSink(1L, "vm1", 2, columnMetadata, bucket) rows.foreach { row => @@ -55,7 +55,7 @@ class BatchSizedSinkTest extends FunSuite with BeforeAndAfterEach with DummyReco test("export several files with batch size smaller than total records") { val bucket = LocalBucket( outputPath.toUri.toString, - new StorageProperties(properties ++ Map("EXPORT_BATCH_SIZE" -> "3")) + StorageProperties(properties ++ Map("EXPORT_BATCH_SIZE" -> "3")) ) val sink = new BatchSizedSink(1L, "vm1", 7, columnMetadata, bucket) val newRows = rows ++ rows ++ rows ++ rows.take(1) diff --git a/src/test/scala/com/exasol/cloudetl/storage/StoragePropertiesTest.scala b/src/test/scala/com/exasol/cloudetl/storage/StoragePropertiesTest.scala index c455adcc..46d22eed 100644 --- a/src/test/scala/com/exasol/cloudetl/storage/StoragePropertiesTest.scala +++ b/src/test/scala/com/exasol/cloudetl/storage/StoragePropertiesTest.scala @@ -1,9 +1,16 @@ package com.exasol.cloudetl.storage +import com.exasol.ExaConnectionInformation +import com.exasol.ExaMetadata + +import org.mockito.Mockito.times +import org.mockito.Mockito.verify +import org.mockito.Mockito.when import org.scalatest.BeforeAndAfterEach import org.scalatest.FunSuite +import org.scalatest.mockito.MockitoSugar -class StoragePropertiesTest extends FunSuite with BeforeAndAfterEach { +class StoragePropertiesTest extends FunSuite with BeforeAndAfterEach with MockitoSugar { private[this] var properties: Map[String, String] = _ @@ -66,6 +73,38 @@ class StoragePropertiesTest extends FunSuite with BeforeAndAfterEach { assert(BaseProperties(properties).getParallelism("nproc()") === "nproc()") } + test("getConnectionInformation throws if Exasol metadata is not provided") { + val thrown = intercept[IllegalArgumentException] { + BaseProperties(properties).getConnectionInformation() + } + assert(thrown.getMessage === "Exasol metadata is None!") + } + + test("getConnectionInformation returns storage connection information") { + properties = Map(StorageProperties.CONNECTION_NAME -> "connection_info") + val metadata = mock[ExaMetadata] + val connectionInfo: ExaConnectionInformation = new ExaConnectionInformation() { + override def getType(): ExaConnectionInformation.ConnectionType = + ExaConnectionInformation.ConnectionType.PASSWORD + override def getAddress(): String = "" + override def getUser(): String = "user" + override def getPassword(): String = "secret" + } + + when(metadata.getConnection("connection_info")).thenReturn(connectionInfo) + assert(StorageProperties(properties, metadata).getConnectionInformation() === connectionInfo) + verify(metadata, times(1)).getConnection("connection_info") + } + + test("hasNamedConnection returns false by default") { + assert(BaseProperties(properties).hasNamedConnection() === false) + } + + test("hasNamedConnection returns true if connection name is set") { + properties = Map(StorageProperties.CONNECTION_NAME -> "named_connection") + assert(BaseProperties(properties).hasNamedConnection() === true) + } + test("mkString returns empty string by default") { val str = BaseProperties(properties).mkString() assert(str.isEmpty === true) @@ -84,6 +123,13 @@ class StoragePropertiesTest extends FunSuite with BeforeAndAfterEach { assert(StorageProperties(properties) === baseProperty) } + test("apply(map) with Exasol metadata returns correct StorageProperties") { + properties = Map("k" -> "v") + val metadata = mock[ExaMetadata] + val expected = StorageProperties(properties, metadata) + assert(StorageProperties(properties, metadata) === expected) + } + test("apply(string) throws if input string does not contain separator") { val thrown = intercept[IllegalArgumentException] { StorageProperties("") @@ -98,7 +144,15 @@ class StoragePropertiesTest extends FunSuite with BeforeAndAfterEach { assert(StorageProperties(mkStringResult) === baseProperty) } + test("apply(string) with Exasol metadata returns correct StorageProperties") { + properties = Map("k1" -> "v1", "k2" -> "v2") + val metadata = mock[ExaMetadata] + val expected = StorageProperties(properties, metadata) + val mkStringResult = BaseProperties(properties).mkString() + assert(StorageProperties(mkStringResult, metadata) === expected) + } + private[this] case class BaseProperties(val params: Map[String, String]) - extends StorageProperties(params) + extends StorageProperties(params, None) }