From a369225ef86dee46032749cf23f0462cce939518 Mon Sep 17 00:00:00 2001 From: Muhammet Orazov Date: Tue, 22 Jan 2019 15:37:55 +0100 Subject: [PATCH 01/20] [skip ci] Add link to latest Github release in readme --- README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/README.md b/README.md index 9386c130..1b901802 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,7 @@ [![Build Status][travis-badge]][travis-link] [![Codecov][codecov-badge]][codecov-link] +[![GitHub Latest Release][gh-release-badge]][gh-release-link] ###### Please note that this is an open source project which is *not officially supported* by Exasol. We will try to help you as much as possible, but can't guarantee anything since this is not an official Exasol product. @@ -174,6 +175,8 @@ The packaged jar should be located at [travis-link]: https://travis-ci.org/exasol/cloud-storage-etl-udfs [codecov-badge]: https://codecov.io/gh/exasol/cloud-storage-etl-udfs/branch/master/graph/badge.svg [codecov-link]: https://codecov.io/gh/exasol/cloud-storage-etl-udfs +[gh-release-badge]: https://img.shields.io/github/release/exasol/cloud-storage-etl-udfs.svg +[gh-release-link]: https://github.com/exasol/cloud-storage-etl-udfs/releases/latest [exasol]: https://www.exasol.com/en/ [s3]: https://aws.amazon.com/s3/ [gcs]: https://cloud.google.com/storage/ From 1d777fec5e94ba046d3e6cfe027c8260143b57af Mon Sep 17 00:00:00 2001 From: Muhammet Orazov Date: Wed, 23 Jan 2019 14:44:36 +0100 Subject: [PATCH 02/20] Start Exasol table to S3 export as parquet format This commit only adds the ExportPath class. --- .../cloudetl/scriptclasses/ExportPath.scala | 35 +++++++++++++ ...{BaseImportSuite.scala => BaseSuite.scala} | 6 ++- .../scriptclasses/ExportPathSuite.scala | 51 +++++++++++++++++++ .../scriptclasses/ImportFilesSuite.scala | 2 +- .../scriptclasses/ImportMetadataSuite.scala | 2 +- .../scriptclasses/ImportPathSuite.scala | 6 +-- 6 files changed, 94 insertions(+), 8 deletions(-) create mode 100644 src/main/scala/com/exasol/cloudetl/scriptclasses/ExportPath.scala rename src/test/scala/com/exasol/cloudetl/scriptclasses/{BaseImportSuite.scala => BaseSuite.scala} (84%) create mode 100644 src/test/scala/com/exasol/cloudetl/scriptclasses/ExportPathSuite.scala diff --git a/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportPath.scala b/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportPath.scala new file mode 100644 index 00000000..32e45c39 --- /dev/null +++ b/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportPath.scala @@ -0,0 +1,35 @@ +package com.exasol.cloudetl.scriptclasses + +import scala.collection.JavaConverters._ + +import com.exasol.ExaImportSpecification +import com.exasol.ExaMetadata +import com.exasol.cloudetl.bucket._ + +object ExportPath { + + def generateSqlForImportSpec(exaMeta: ExaMetadata, exaSpec: ExaImportSpecification): String = { + val params = exaSpec.getParameters.asScala.toMap + + val bucket = Bucket(params) + + bucket.validate() + + val bucketPath = bucket.bucketPath + val parallelism = Bucket.optionalParam(params, "PARALLELISM", "nproc()") + + val rest = Bucket.mapToStr(params) + + val scriptSchema = exaMeta.getScriptSchema + + s""" + |SELECT + | $scriptSchema.EXPORT_TABLE('$bucketPath', '$rest') + |FROM + | DUAL + |GROUP BY + | $parallelism; + """.stripMargin + } + +} diff --git a/src/test/scala/com/exasol/cloudetl/scriptclasses/BaseImportSuite.scala b/src/test/scala/com/exasol/cloudetl/scriptclasses/BaseSuite.scala similarity index 84% rename from src/test/scala/com/exasol/cloudetl/scriptclasses/BaseImportSuite.scala rename to src/test/scala/com/exasol/cloudetl/scriptclasses/BaseSuite.scala index 9f65afe7..e0356cf5 100644 --- a/src/test/scala/com/exasol/cloudetl/scriptclasses/BaseImportSuite.scala +++ b/src/test/scala/com/exasol/cloudetl/scriptclasses/BaseSuite.scala @@ -11,7 +11,7 @@ import org.scalatest.FunSuite import org.scalatest.Matchers import org.scalatest.mockito.MockitoSugar -trait BaseImportSuite extends FunSuite with Matchers with MockitoSugar { +trait BaseSuite extends FunSuite with Matchers with MockitoSugar { val testSchema = "my_schema" @@ -27,6 +27,10 @@ trait BaseImportSuite extends FunSuite with Matchers with MockitoSugar { "S3_SECRET_KEY" -> s3SecretKey ) + val rest = + s"""BUCKET_PATH:=:$s3BucketPath;S3_ENDPOINT:=:$s3Endpoint;""" + + s"""S3_ACCESS_KEY:=:$s3AccessKey;S3_SECRET_KEY:=:$s3SecretKey""" + val resourcePath: String = norm(Paths.get(getClass.getResource("/parquet").toURI)) val resourceBucket: String = s"$resourcePath/*.parquet" diff --git a/src/test/scala/com/exasol/cloudetl/scriptclasses/ExportPathSuite.scala b/src/test/scala/com/exasol/cloudetl/scriptclasses/ExportPathSuite.scala new file mode 100644 index 00000000..f7a3244f --- /dev/null +++ b/src/test/scala/com/exasol/cloudetl/scriptclasses/ExportPathSuite.scala @@ -0,0 +1,51 @@ +package com.exasol.cloudetl.scriptclasses + +import scala.collection.JavaConverters._ + +import com.exasol.ExaImportSpecification +import com.exasol.ExaMetadata + +import org.mockito.Mockito._ + +class ExportPathSuite extends BaseSuite { + + test("`generateSqlForImportSpec` should create a sql statement") { + val exaMeta = mock[ExaMetadata] + val exaSpec = mock[ExaImportSpecification] + + when(exaMeta.getScriptSchema()).thenReturn(testSchema) + when(exaSpec.getParameters()).thenReturn(params.asJava) + + val sqlExpected = + s""" + |SELECT + | $testSchema.EXPORT_TABLE('$s3BucketPath', '$rest') + |FROM + | DUAL + |GROUP BY + | nproc(); + """.stripMargin + + assert(ExportPath.generateSqlForImportSpec(exaMeta, exaSpec).trim === sqlExpected.trim) + verify(exaMeta, atLeastOnce).getScriptSchema + verify(exaSpec, times(1)).getParameters + } + + test("`generateSqlForImportSpec` should throw an exception if any required param is missing") { + val exaMeta = mock[ExaMetadata] + val exaSpec = mock[ExaImportSpecification] + + val newParams = params - ("S3_ACCESS_KEY") + + when(exaMeta.getScriptSchema()).thenReturn(testSchema) + when(exaSpec.getParameters()).thenReturn(newParams.asJava) + + val thrown = intercept[IllegalArgumentException] { + ExportPath.generateSqlForImportSpec(exaMeta, exaSpec) + } + + assert(thrown.getMessage === "The required parameter S3_ACCESS_KEY is not defined!") + verify(exaSpec, times(1)).getParameters + } + +} diff --git a/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportFilesSuite.scala b/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportFilesSuite.scala index 1125f84f..8063d5fd 100644 --- a/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportFilesSuite.scala +++ b/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportFilesSuite.scala @@ -6,7 +6,7 @@ import org.mockito.ArgumentMatchers.any import org.mockito.Mockito._ @SuppressWarnings(Array("org.wartremover.warts.Any")) -class ImportFilesSuite extends BaseImportSuite { +class ImportFilesSuite extends BaseSuite { test("`run` should emit total number of records") { val file1 = s"$resourcePath/sales_positions1.snappy.parquet" diff --git a/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportMetadataSuite.scala b/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportMetadataSuite.scala index 7082ef39..3586e821 100644 --- a/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportMetadataSuite.scala +++ b/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportMetadataSuite.scala @@ -5,7 +5,7 @@ import com.exasol.ExaMetadata import org.mockito.ArgumentMatchers.anyString import org.mockito.Mockito._ -class ImportMetadataSuite extends BaseImportSuite { +class ImportMetadataSuite extends BaseSuite { test("`run` should create a list of files names") { val exaIter = commonExaIterator(resourceBucket) diff --git a/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportPathSuite.scala b/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportPathSuite.scala index 9a58e602..86402222 100644 --- a/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportPathSuite.scala +++ b/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportPathSuite.scala @@ -7,7 +7,7 @@ import com.exasol.ExaMetadata import org.mockito.Mockito._ -class ImportPathSuite extends BaseImportSuite { +class ImportPathSuite extends BaseSuite { test("`generateSqlForImportSpec` should create a sql statement") { val exaMeta = mock[ExaMetadata] @@ -16,10 +16,6 @@ class ImportPathSuite extends BaseImportSuite { when(exaMeta.getScriptSchema()).thenReturn(testSchema) when(exaSpec.getParameters()).thenReturn(params.asJava) - val rest = - s"""BUCKET_PATH:=:$s3BucketPath;S3_ENDPOINT:=:$s3Endpoint;""" + - s"""S3_ACCESS_KEY:=:$s3AccessKey;S3_SECRET_KEY:=:$s3SecretKey""" - val sqlExpected = s""" |SELECT From 7fab7da02ea95148bcfa36b8aa52513ac3a4d492 Mon Sep 17 00:00:00 2001 From: Muhammet Orazov Date: Wed, 23 Jan 2019 14:45:42 +0100 Subject: [PATCH 03/20] [skip ci] Update readme with export api description --- README.md | 32 ++++++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 1b901802..90f3f357 100644 --- a/README.md +++ b/README.md @@ -53,6 +53,8 @@ Please change required parameters. CREATE SCHEMA ETL; OPEN SCHEMA ETL; +-- Import related scripts + CREATE OR REPLACE JAVA SET SCRIPT IMPORT_PATH(...) EMITS (...) AS %scriptclass com.exasol.cloudetl.scriptclasses.ImportPath; %jar /buckets/bfsdefault/bucket1/cloud-storage-etl-udfs-{VERSION}.jar; @@ -69,6 +71,18 @@ EMITS (filename VARCHAR(200), partition_index VARCHAR(100)) AS %scriptclass com.exasol.cloudetl.scriptclasses.ImportMetadata; %jar /buckets/bfsdefault/bucket1/cloud-storage-etl-udfs-{VERSION}.jar; / + +-- Export related scripts + +CREATE OR REPLACE JAVA SET SCRIPT EXPORT_PATH(...) EMITS (...) AS +%scriptclass com.exasol.cloudetl.scriptclasses.ExportPath; +%jar /buckets/bfsdefault/bucket1/cloud-storage-etl-udfs-{VERSION}.jar; +/ + +CREATE OR REPLACE JAVA SET SCRIPT EXPORT_TABLE(...) EMITS (ROWS_AFFECTED INT) AS +%scriptclass com.exasol.cloudetl.scriptclasses.ExportTable; +%jar /buckets/bfsdefault/bucket1/cloud-storage-etl-udfs-{VERSION}.jar; +/ ``` ### Import data from cloud storages @@ -78,8 +92,8 @@ Please follow steps below in order to import from cloud strorages. #### Create an Exasol schema and table ```sql -CREATE SCHEMA TEST; -OPEN SCHEMA TEST; +CREATE SCHEMA RETAIL; +OPEN SCHEMA RETAIL; DROP TABLE IF EXISTS SALES_POSITIONS; @@ -152,6 +166,20 @@ FROM SCRIPT ETL.IMPORT_PATH WITH SELECT * FROM SALES_POSITIONS LIMIT 10; ``` +#### Export to AWS S3 + +```sql +EXPORT SALES_POSITIONS +INTO SCRIPT ETL.IMPORT_PATH WITH + BUCKET_PATH = 's3a://exa-mo-frankfurt/export/retail/sales_positions/' + S3_ACCESS_KEY = 'MY_AWS_ACCESS_KEY' + S3_SECRET_KEY = 'MY_AWS_SECRET_KEY' + S3_ENDPOINT = 's3.MY_REGION.amazonaws.com' + PARALLELISM = 'nproc()'; + +-- MY_REGION is one of AWS regions, for example, eu-central-1 +``` + ## Building from Source Clone the repository, From c6b29072dd8db53caffefabc44dc2924321226d5 Mon Sep 17 00:00:00 2001 From: Muhammet Orazov Date: Fri, 25 Jan 2019 10:05:22 +0100 Subject: [PATCH 04/20] Add test to check the parquet schema --- .../cloudetl/source/ParquetSourceSuite.scala | 27 ++++++++++++++++--- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/src/test/scala/com/exasol/cloudetl/source/ParquetSourceSuite.scala b/src/test/scala/com/exasol/cloudetl/source/ParquetSourceSuite.scala index 1966f21a..d1d542df 100644 --- a/src/test/scala/com/exasol/cloudetl/source/ParquetSourceSuite.scala +++ b/src/test/scala/com/exasol/cloudetl/source/ParquetSourceSuite.scala @@ -6,6 +6,7 @@ import com.exasol.cloudetl.util.FsUtil import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem +import org.apache.parquet.schema.MessageTypeParser import org.scalatest.FunSuite import org.scalatest.Matchers @@ -24,12 +25,30 @@ class ParquetSourceSuite extends FunSuite with Matchers { assert(iters.map(_.size).sum === 500) } + private val resourcesDir = salesPosParquetFile.getParent + private val pattern = s"${resourcesDir.toUri.toString}/*.parquet" + test("reads multiple parquet files") { - val resourcesDir = salesPosParquetFile.getParent - val pattern = s"${resourcesDir.toUri.toString}/*.parquet" - val data = ParquetSource(pattern, fs, conf) - val iters = data.stream + val iters = ParquetSource(pattern, fs, conf).stream() assert(iters.map(_.size).sum === 1005) } + test("reads parquet files schema") { + val schema = ParquetSource(pattern, fs, conf).getSchema() + val expectedMsgType = MessageTypeParser + .parseMessageType("""message spark_schema { + | optional int64 sales_id; + | optional int32 position_id; + | optional int32 article_id; + | optional int32 amount; + | optional double price; + | optional int32 voucher_id; + | optional boolean canceled; + |} + """.stripMargin) + + assert(schema.isDefined) + schema.foreach { case msgType => assert(msgType === expectedMsgType) } + } + } From 6faa6c227941225ef3900ba9621f2865e0c9b166 Mon Sep 17 00:00:00 2001 From: Muhammet Orazov Date: Fri, 25 Jan 2019 12:21:41 +0100 Subject: [PATCH 05/20] Minor refactoring and typo fix in tests --- src/main/scala/com/exasol/cloudetl/data/Row.scala | 3 +++ .../exasol/cloudetl/{source => parquet}/ParquetSource.scala | 5 ++--- .../{row/Row.scala => parquet/RowReadSupport.scala} | 6 +++--- .../com/exasol/cloudetl/scriptclasses/ImportFiles.scala | 2 +- .../cloudetl/{source => parquet}/ParquetSourceSuite.scala | 2 +- .../exasol/cloudetl/scriptclasses/ImportFilesSuite.scala | 2 +- 6 files changed, 11 insertions(+), 9 deletions(-) create mode 100644 src/main/scala/com/exasol/cloudetl/data/Row.scala rename src/main/scala/com/exasol/cloudetl/{source => parquet}/ParquetSource.scala (94%) rename src/main/scala/com/exasol/cloudetl/{row/Row.scala => parquet/RowReadSupport.scala} (97%) rename src/test/scala/com/exasol/cloudetl/{source => parquet}/ParquetSourceSuite.scala (98%) diff --git a/src/main/scala/com/exasol/cloudetl/data/Row.scala b/src/main/scala/com/exasol/cloudetl/data/Row.scala new file mode 100644 index 00000000..aafe5ec1 --- /dev/null +++ b/src/main/scala/com/exasol/cloudetl/data/Row.scala @@ -0,0 +1,3 @@ +package com.exasol.cloudetl.data + +final case class Row(val values: Seq[Any]) diff --git a/src/main/scala/com/exasol/cloudetl/source/ParquetSource.scala b/src/main/scala/com/exasol/cloudetl/parquet/ParquetSource.scala similarity index 94% rename from src/main/scala/com/exasol/cloudetl/source/ParquetSource.scala rename to src/main/scala/com/exasol/cloudetl/parquet/ParquetSource.scala index ee3083d8..49f6a83d 100644 --- a/src/main/scala/com/exasol/cloudetl/source/ParquetSource.scala +++ b/src/main/scala/com/exasol/cloudetl/parquet/ParquetSource.scala @@ -1,10 +1,9 @@ -package com.exasol.cloudetl.source +package com.exasol.cloudetl.parquet import scala.collection.JavaConverters._ import scala.language.reflectiveCalls -import com.exasol.cloudetl.row.Row -import com.exasol.cloudetl.row.RowReadSupport +import com.exasol.cloudetl.data.Row import com.exasol.cloudetl.util.FsUtil import org.apache.hadoop.conf.Configuration diff --git a/src/main/scala/com/exasol/cloudetl/row/Row.scala b/src/main/scala/com/exasol/cloudetl/parquet/RowReadSupport.scala similarity index 97% rename from src/main/scala/com/exasol/cloudetl/row/Row.scala rename to src/main/scala/com/exasol/cloudetl/parquet/RowReadSupport.scala index a7ec53be..df884275 100644 --- a/src/main/scala/com/exasol/cloudetl/row/Row.scala +++ b/src/main/scala/com/exasol/cloudetl/parquet/RowReadSupport.scala @@ -1,4 +1,6 @@ -package com.exasol.cloudetl.row +package com.exasol.cloudetl.parquet + +import com.exasol.cloudetl.data.Row import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.api.ReadSupport @@ -12,8 +14,6 @@ import org.apache.parquet.schema.GroupType import org.apache.parquet.schema.MessageType import org.apache.parquet.schema.Type -final case class Row(val values: Seq[Any]) - @SuppressWarnings(Array("org.wartremover.contrib.warts.UnsafeInheritance")) class RowReadSupport extends ReadSupport[Row] { diff --git a/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportFiles.scala b/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportFiles.scala index b1a6fc32..ed111c83 100644 --- a/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportFiles.scala +++ b/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportFiles.scala @@ -5,7 +5,7 @@ import scala.collection.mutable.ListBuffer import com.exasol.ExaIterator import com.exasol.ExaMetadata import com.exasol.cloudetl.bucket._ -import com.exasol.cloudetl.source.ParquetSource +import com.exasol.cloudetl.parquet.ParquetSource import com.typesafe.scalalogging.LazyLogging import org.apache.hadoop.conf.Configuration diff --git a/src/test/scala/com/exasol/cloudetl/source/ParquetSourceSuite.scala b/src/test/scala/com/exasol/cloudetl/parquet/ParquetSourceSuite.scala similarity index 98% rename from src/test/scala/com/exasol/cloudetl/source/ParquetSourceSuite.scala rename to src/test/scala/com/exasol/cloudetl/parquet/ParquetSourceSuite.scala index d1d542df..f23f2fb6 100644 --- a/src/test/scala/com/exasol/cloudetl/source/ParquetSourceSuite.scala +++ b/src/test/scala/com/exasol/cloudetl/parquet/ParquetSourceSuite.scala @@ -1,4 +1,4 @@ -package com.exasol.cloudetl.source +package com.exasol.cloudetl.parquet import java.nio.file.Paths diff --git a/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportFilesSuite.scala b/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportFilesSuite.scala index 8063d5fd..ef5ebe3b 100644 --- a/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportFilesSuite.scala +++ b/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportFilesSuite.scala @@ -37,7 +37,7 @@ class ImportFilesSuite extends BaseSuite { * +---------+-----------+----------+------+-----+----------+--------+ * */ - test("`run` should emit corrent sequence of records") { + test("`run` should emit correct sequence of records") { val file = s"$resourcePath/sales_positions_small.snappy.parquet" val exaIter = commonExaIterator(resourceBucket) From c268f63624664c5666538d55843fcf3c5cdc5d7d Mon Sep 17 00:00:00 2001 From: Muhammet Orazov Date: Tue, 29 Jan 2019 09:23:12 +0100 Subject: [PATCH 06/20] Fix type `import` -> `export` --- README.md | 2 +- .../exasol/cloudetl/scriptclasses/ExportPath.scala | 4 ++-- .../cloudetl/scriptclasses/ExportPathSuite.scala | 14 +++++++------- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 90f3f357..77d2283d 100644 --- a/README.md +++ b/README.md @@ -170,7 +170,7 @@ SELECT * FROM SALES_POSITIONS LIMIT 10; ```sql EXPORT SALES_POSITIONS -INTO SCRIPT ETL.IMPORT_PATH WITH +INTO SCRIPT ETL.EXPORT_PATH WITH BUCKET_PATH = 's3a://exa-mo-frankfurt/export/retail/sales_positions/' S3_ACCESS_KEY = 'MY_AWS_ACCESS_KEY' S3_SECRET_KEY = 'MY_AWS_SECRET_KEY' diff --git a/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportPath.scala b/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportPath.scala index 32e45c39..83011003 100644 --- a/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportPath.scala +++ b/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportPath.scala @@ -2,13 +2,13 @@ package com.exasol.cloudetl.scriptclasses import scala.collection.JavaConverters._ -import com.exasol.ExaImportSpecification +import com.exasol.ExaExportSpecification import com.exasol.ExaMetadata import com.exasol.cloudetl.bucket._ object ExportPath { - def generateSqlForImportSpec(exaMeta: ExaMetadata, exaSpec: ExaImportSpecification): String = { + def generateSqlForExportSpec(exaMeta: ExaMetadata, exaSpec: ExaExportSpecification): String = { val params = exaSpec.getParameters.asScala.toMap val bucket = Bucket(params) diff --git a/src/test/scala/com/exasol/cloudetl/scriptclasses/ExportPathSuite.scala b/src/test/scala/com/exasol/cloudetl/scriptclasses/ExportPathSuite.scala index f7a3244f..6e0a5aee 100644 --- a/src/test/scala/com/exasol/cloudetl/scriptclasses/ExportPathSuite.scala +++ b/src/test/scala/com/exasol/cloudetl/scriptclasses/ExportPathSuite.scala @@ -2,16 +2,16 @@ package com.exasol.cloudetl.scriptclasses import scala.collection.JavaConverters._ -import com.exasol.ExaImportSpecification +import com.exasol.ExaExportSpecification import com.exasol.ExaMetadata import org.mockito.Mockito._ class ExportPathSuite extends BaseSuite { - test("`generateSqlForImportSpec` should create a sql statement") { + test("`generateSqlForExportSpec` should create a sql statement") { val exaMeta = mock[ExaMetadata] - val exaSpec = mock[ExaImportSpecification] + val exaSpec = mock[ExaExportSpecification] when(exaMeta.getScriptSchema()).thenReturn(testSchema) when(exaSpec.getParameters()).thenReturn(params.asJava) @@ -26,14 +26,14 @@ class ExportPathSuite extends BaseSuite { | nproc(); """.stripMargin - assert(ExportPath.generateSqlForImportSpec(exaMeta, exaSpec).trim === sqlExpected.trim) + assert(ExportPath.generateSqlForExportSpec(exaMeta, exaSpec).trim === sqlExpected.trim) verify(exaMeta, atLeastOnce).getScriptSchema verify(exaSpec, times(1)).getParameters } - test("`generateSqlForImportSpec` should throw an exception if any required param is missing") { + test("`generateSqlForExportSpec` should throw an exception if any required param is missing") { val exaMeta = mock[ExaMetadata] - val exaSpec = mock[ExaImportSpecification] + val exaSpec = mock[ExaExportSpecification] val newParams = params - ("S3_ACCESS_KEY") @@ -41,7 +41,7 @@ class ExportPathSuite extends BaseSuite { when(exaSpec.getParameters()).thenReturn(newParams.asJava) val thrown = intercept[IllegalArgumentException] { - ExportPath.generateSqlForImportSpec(exaMeta, exaSpec) + ExportPath.generateSqlForExportSpec(exaMeta, exaSpec) } assert(thrown.getMessage === "The required parameter S3_ACCESS_KEY is not defined!") From a81db06f32579e9ce2a2f841e5527bff72a5f311 Mon Sep 17 00:00:00 2001 From: Muhammet Orazov Date: Tue, 29 Jan 2019 14:43:14 +0100 Subject: [PATCH 07/20] Add tests for export path object --- .../cloudetl/scriptclasses/ExportPath.scala | 30 ++++++++++++---- .../cloudetl/scriptclasses/ImportPath.scala | 5 ++- .../scriptclasses/ExportPathSuite.scala | 36 ++++++++++++++++--- .../scriptclasses/ImportPathSuite.scala | 7 ++-- 4 files changed, 60 insertions(+), 18 deletions(-) diff --git a/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportPath.scala b/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportPath.scala index 83011003..ad004c80 100644 --- a/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportPath.scala +++ b/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportPath.scala @@ -10,26 +10,44 @@ object ExportPath { def generateSqlForExportSpec(exaMeta: ExaMetadata, exaSpec: ExaExportSpecification): String = { val params = exaSpec.getParameters.asScala.toMap - val bucket = Bucket(params) bucket.validate() val bucketPath = bucket.bucketPath val parallelism = Bucket.optionalParam(params, "PARALLELISM", "nproc()") - val rest = Bucket.mapToStr(params) val scriptSchema = exaMeta.getScriptSchema - s""" - |SELECT - | $scriptSchema.EXPORT_TABLE('$bucketPath', '$rest') + val srcColumns = getSourceColumns(exaSpec) + val srcColumnsParam = srcColumns.mkString(".") + + s"""SELECT + | $scriptSchema.EXPORT_TABLE( + | '$bucketPath', '$rest', '$srcColumnsParam', ${srcColumns.mkString(", ")} + |) |FROM | DUAL |GROUP BY | $parallelism; - """.stripMargin + |""".stripMargin + } + + private[this] def getSourceColumns(spec: ExaExportSpecification): Seq[String] = + spec.getSourceColumnNames.asScala + .map { + case value => + // Remove quotes if present + getColumnName(value).replaceAll("\"", "") + } + + /** Given a table name dot column name syntax (myTable.colInt), return the column name. */ + private[this] def getColumnName(str: String): String = str.split("\\.") match { + case Array(colName) => colName + case Array(tblName @ _, colName) => colName + case _ => + throw new RuntimeException(s"Could not parse the column name from '$str'!") } } diff --git a/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportPath.scala b/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportPath.scala index 062b1ae1..981615db 100644 --- a/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportPath.scala +++ b/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportPath.scala @@ -22,8 +22,7 @@ object ImportPath { val scriptSchema = exaMeta.getScriptSchema - s""" - |SELECT + s"""SELECT | $scriptSchema.IMPORT_FILES( | '$bucketPath', '$rest', filename |) @@ -34,7 +33,7 @@ object ImportPath { |) |GROUP BY | partition_index; - """.stripMargin + |""".stripMargin } } diff --git a/src/test/scala/com/exasol/cloudetl/scriptclasses/ExportPathSuite.scala b/src/test/scala/com/exasol/cloudetl/scriptclasses/ExportPathSuite.scala index 6e0a5aee..823af5c0 100644 --- a/src/test/scala/com/exasol/cloudetl/scriptclasses/ExportPathSuite.scala +++ b/src/test/scala/com/exasol/cloudetl/scriptclasses/ExportPathSuite.scala @@ -16,19 +16,24 @@ class ExportPathSuite extends BaseSuite { when(exaMeta.getScriptSchema()).thenReturn(testSchema) when(exaSpec.getParameters()).thenReturn(params.asJava) + val srcCols = Seq("tbl.col_int", "c_bool", "c_char") + when(exaSpec.getSourceColumnNames).thenReturn(srcCols.asJava) + val sqlExpected = - s""" - |SELECT - | $testSchema.EXPORT_TABLE('$s3BucketPath', '$rest') + s"""SELECT + | $testSchema.EXPORT_TABLE( + | '$s3BucketPath', '$rest', 'col_int.c_bool.c_char', col_int, c_bool, c_char + |) |FROM | DUAL |GROUP BY | nproc(); - """.stripMargin + |""".stripMargin - assert(ExportPath.generateSqlForExportSpec(exaMeta, exaSpec).trim === sqlExpected.trim) + assert(ExportPath.generateSqlForExportSpec(exaMeta, exaSpec) === sqlExpected) verify(exaMeta, atLeastOnce).getScriptSchema verify(exaSpec, times(1)).getParameters + verify(exaSpec, times(1)).getSourceColumnNames } test("`generateSqlForExportSpec` should throw an exception if any required param is missing") { @@ -46,6 +51,27 @@ class ExportPathSuite extends BaseSuite { assert(thrown.getMessage === "The required parameter S3_ACCESS_KEY is not defined!") verify(exaSpec, times(1)).getParameters + verify(exaSpec, never).getSourceColumnNames + } + + test("`generateSqlForExportSpec` throws if column cannot be parsed (contains extra '.')") { + val exaMeta = mock[ExaMetadata] + val exaSpec = mock[ExaExportSpecification] + + when(exaMeta.getScriptSchema()).thenReturn(testSchema) + when(exaSpec.getParameters()).thenReturn(params.asJava) + + val srcCols = Seq("tbl.c_int.integer") + when(exaSpec.getSourceColumnNames).thenReturn(srcCols.asJava) + + val thrown = intercept[RuntimeException] { + ExportPath.generateSqlForExportSpec(exaMeta, exaSpec) + } + + assert(thrown.getMessage === "Could not parse the column name from 'tbl.c_int.integer'!") + verify(exaMeta, atLeastOnce).getScriptSchema + verify(exaSpec, times(1)).getParameters + verify(exaSpec, times(1)).getSourceColumnNames } } diff --git a/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportPathSuite.scala b/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportPathSuite.scala index 86402222..4c23f156 100644 --- a/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportPathSuite.scala +++ b/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportPathSuite.scala @@ -17,8 +17,7 @@ class ImportPathSuite extends BaseSuite { when(exaSpec.getParameters()).thenReturn(params.asJava) val sqlExpected = - s""" - |SELECT + s"""SELECT | $testSchema.IMPORT_FILES( | '$s3BucketPath', '$rest', filename |) @@ -29,9 +28,9 @@ class ImportPathSuite extends BaseSuite { |) |GROUP BY | partition_index; - """.stripMargin + |""".stripMargin - assert(ImportPath.generateSqlForImportSpec(exaMeta, exaSpec).trim === sqlExpected.trim) + assert(ImportPath.generateSqlForImportSpec(exaMeta, exaSpec) === sqlExpected) verify(exaMeta, atLeastOnce).getScriptSchema verify(exaSpec, times(1)).getParameters } From 53dff6b7ca53dc298259bbcb02032a7997dbefe0 Mon Sep 17 00:00:00 2001 From: Muhammet Orazov Date: Wed, 30 Jan 2019 11:09:12 +0100 Subject: [PATCH 08/20] Add exasol columns to parquet message type conversion functions --- .../exasol/cloudetl/data/ExaColumnInfo.scala | 11 ++ .../com/exasol/cloudetl/util/SchemaUtil.scala | 137 ++++++++++++++++++ .../cloudetl/util/SchemaUtilSuite.scala | 96 ++++++++++++ 3 files changed, 244 insertions(+) create mode 100644 src/main/scala/com/exasol/cloudetl/data/ExaColumnInfo.scala create mode 100644 src/main/scala/com/exasol/cloudetl/util/SchemaUtil.scala create mode 100644 src/test/scala/com/exasol/cloudetl/util/SchemaUtilSuite.scala diff --git a/src/main/scala/com/exasol/cloudetl/data/ExaColumnInfo.scala b/src/main/scala/com/exasol/cloudetl/data/ExaColumnInfo.scala new file mode 100644 index 00000000..f89610cc --- /dev/null +++ b/src/main/scala/com/exasol/cloudetl/data/ExaColumnInfo.scala @@ -0,0 +1,11 @@ +package com.exasol.cloudetl.data + +/** An Exasol table column information */ +final case class ExaColumnInfo( + name: String, + `type`: Class[_], + precision: Int, + scale: Int, + length: Int, + isNullable: Boolean +) diff --git a/src/main/scala/com/exasol/cloudetl/util/SchemaUtil.scala b/src/main/scala/com/exasol/cloudetl/util/SchemaUtil.scala new file mode 100644 index 00000000..7315850b --- /dev/null +++ b/src/main/scala/com/exasol/cloudetl/util/SchemaUtil.scala @@ -0,0 +1,137 @@ +package com.exasol.cloudetl.util + +import com.exasol.cloudetl.data.ExaColumnInfo + +import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.OriginalType +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName +import org.apache.parquet.schema.Type +import org.apache.parquet.schema.Type.Repetition +import org.apache.parquet.schema.Types + +object SchemaUtil { + + // Maps the precision value into the number of bytes + // Adapted from: + // - org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe.java + val PRECISION_TO_BYTE_SIZE: Seq[Int] = { + for { + prec <- 1 to 38 // [1 .. 38] + power = Math.pow(10, prec.toDouble) // scalastyle:ignore magic.number + size = Math.ceil((Math.log(power - 1) / Math.log(2) + 1) / 8) + } yield size.toInt + } + + /** + * Given the Exasol column information returns Parquet [[org.apache.parquet.schema.MessageType]] + */ + def createParquetMessageType(columns: Seq[ExaColumnInfo], schemaName: String): MessageType = { + val types = columns.map(exaColumnToParquetType(_)) + new MessageType(schemaName, types: _*) + } + + /** + * Given Exasol column [[com.exasol.cloudetl.data.ExaColumnInfo]] information convert it into + * Parquet [[org.apache.parquet.schema.Type$]] + */ + def exaColumnToParquetType(colInfo: ExaColumnInfo): Type = { + val colName = colInfo.name + val colType = colInfo.`type` + val repetition = if (colInfo.isNullable) Repetition.OPTIONAL else Repetition.REQUIRED + + // In below several lines, I try to pattern match on Class[X] of Java types. + // Please also read: + // https://stackoverflow.com/questions/7519140/pattern-matching-on-class-type + object JTypes { + val jInteger: Class[java.lang.Integer] = classOf[java.lang.Integer] + val jLong: Class[java.lang.Long] = classOf[java.lang.Long] + val jBigDecimal: Class[java.math.BigDecimal] = classOf[java.math.BigDecimal] + val jDouble: Class[java.lang.Double] = classOf[java.lang.Double] + val jBoolean: Class[java.lang.Boolean] = classOf[java.lang.Boolean] + val jString: Class[java.lang.String] = classOf[java.lang.String] + val jSqlDate: Class[java.sql.Date] = classOf[java.sql.Date] + val jSqlTimestamp: Class[java.sql.Timestamp] = classOf[java.sql.Timestamp] + } + import JTypes._ + + colType match { + case `jInteger` => + if (colInfo.precision > 0) { + Types + .primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, repetition) + .precision(colInfo.precision) + .scale(colInfo.scale) + .length(PRECISION_TO_BYTE_SIZE(colInfo.precision - 1)) + .as(OriginalType.DECIMAL) + .named(colName) + } else { + Types + .primitive(PrimitiveTypeName.INT32, repetition) + .named(colName) + } + + case `jLong` => + if (colInfo.precision > 0) { + Types + .primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, repetition) + .precision(colInfo.precision) + .scale(colInfo.scale) + .length(PRECISION_TO_BYTE_SIZE(colInfo.precision - 1)) + .as(OriginalType.DECIMAL) + .named(colName) + } else { + Types + .primitive(PrimitiveTypeName.INT64, repetition) + .named(colName) + } + + case `jBigDecimal` => + Types + .primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, repetition) + .precision(colInfo.precision) + .scale(colInfo.scale) + .length(PRECISION_TO_BYTE_SIZE(colInfo.precision - 1)) + .as(OriginalType.DECIMAL) + .named(colName) + + case `jDouble` => + Types + .primitive(PrimitiveTypeName.DOUBLE, repetition) + .named(colName) + + case `jString` => + if (colInfo.length > 0) { + Types + .primitive(PrimitiveTypeName.BINARY, repetition) + .as(OriginalType.UTF8) + .length(colInfo.length) + .named(colName) + } else { + Types + .primitive(PrimitiveTypeName.BINARY, repetition) + .as(OriginalType.UTF8) + .named(colName) + } + + case `jBoolean` => + Types + .primitive(PrimitiveTypeName.BOOLEAN, repetition) + .named(colName) + + case `jSqlDate` => + Types + .primitive(PrimitiveTypeName.INT32, repetition) + .as(OriginalType.DATE) + .named(colName) + + case `jSqlTimestamp` => + Types + .primitive(PrimitiveTypeName.INT96, repetition) + .named(colName) + + case _ => + throw new RuntimeException(s"Cannot convert Exasol type '$colType' to Parquet type.") + } + } + +} diff --git a/src/test/scala/com/exasol/cloudetl/util/SchemaUtilSuite.scala b/src/test/scala/com/exasol/cloudetl/util/SchemaUtilSuite.scala new file mode 100644 index 00000000..19cca80b --- /dev/null +++ b/src/test/scala/com/exasol/cloudetl/util/SchemaUtilSuite.scala @@ -0,0 +1,96 @@ +package com.exasol.cloudetl.util + +import com.exasol.cloudetl.data.ExaColumnInfo + +import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.OriginalType +import org.apache.parquet.schema.PrimitiveType +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName +import org.apache.parquet.schema.Type.Repetition +import org.apache.parquet.schema.Types +import org.scalatest.FunSuite +import org.scalatest.Matchers + +class SchemaUtilSuite extends FunSuite with Matchers { + + test("`createParquetMessageType` throws an exception for unknown type") { + val thrown = intercept[RuntimeException] { + SchemaUtil.createParquetMessageType( + Seq(ExaColumnInfo("c_short", classOf[java.lang.Short], 0, 0, 0, false)), + "test_schema" + ) + } + val expectedMsg = s"Cannot convert Exasol type '${classOf[java.lang.Short]}' to Parquet type." + assert(thrown.getMessage === expectedMsg) + } + + test("`createParquetMessageType` creates parquet message type from list of exa columns") { + + val exaColumns = Seq( + ExaColumnInfo("c_int", classOf[java.lang.Integer], 0, 0, 0, true), + ExaColumnInfo("c_int", classOf[java.lang.Integer], 1, 0, 1, true), + ExaColumnInfo("c_long", classOf[java.lang.Long], 0, 0, 0, false), + ExaColumnInfo("c_long", classOf[java.lang.Long], 7, 3, 4, true), + ExaColumnInfo("c_decimal", classOf[java.math.BigDecimal], 38, 10, 16, false), + ExaColumnInfo("c_double", classOf[java.lang.Double], 0, 0, 0, true), + ExaColumnInfo("c_string", classOf[java.lang.String], 0, 0, 0, false), + ExaColumnInfo("c_string", classOf[java.lang.String], 0, 0, 20, false), + ExaColumnInfo("c_boolean", classOf[java.lang.Boolean], 0, 0, 0, false), + ExaColumnInfo("c_date", classOf[java.sql.Date], 0, 0, 0, false), + ExaColumnInfo("c_timestamp", classOf[java.sql.Timestamp], 0, 0, 0, false) + ) + + val schemaName = "exasol_export_schema" + + val messageType = new MessageType( + schemaName, + new PrimitiveType(Repetition.OPTIONAL, PrimitiveType.PrimitiveTypeName.INT32, "c_int"), + Types + .primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, Repetition.OPTIONAL) + .precision(1) + .scale(0) + .length(1) + .as(OriginalType.DECIMAL) + .named("c_int"), + new PrimitiveType(Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT64, "c_long"), + Types + .primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, Repetition.OPTIONAL) + .precision(7) + .scale(3) + .length(4) + .as(OriginalType.DECIMAL) + .named("c_long"), + Types + .primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, Repetition.REQUIRED) + .precision(38) + .scale(10) + .length(16) + .as(OriginalType.DECIMAL) + .named("c_decimal"), + new PrimitiveType(Repetition.OPTIONAL, PrimitiveType.PrimitiveTypeName.DOUBLE, "c_double"), + new PrimitiveType( + Repetition.REQUIRED, + PrimitiveType.PrimitiveTypeName.BINARY, + "c_string", + OriginalType.UTF8 + ), + Types + .primitive(PrimitiveTypeName.BINARY, Repetition.REQUIRED) + .length(20) + .as(OriginalType.UTF8) + .named("c_string"), + new PrimitiveType( + Repetition.REQUIRED, + PrimitiveType.PrimitiveTypeName.BOOLEAN, + "c_boolean" + ), + Types + .primitive(PrimitiveTypeName.INT32, Repetition.REQUIRED) + .as(OriginalType.DATE) + .named("c_date"), + new PrimitiveType(Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT96, "c_timestamp") + ) + + assert(SchemaUtil.createParquetMessageType(exaColumns, schemaName) === messageType) + } +} From a5ded42245749081dfe035618f1c9d5b9187fef7 Mon Sep 17 00:00:00 2001 From: Muhammet Orazov Date: Wed, 30 Jan 2019 16:00:55 +0100 Subject: [PATCH 09/20] Add exaColumnToValue function that returns value from ExaIterator index --- project/Compilation.scala | 1 + .../exasol/cloudetl/data/ExaColumnInfo.scala | 9 ++-- .../com/exasol/cloudetl/util/SchemaUtil.scala | 50 ++++++++++++++----- .../cloudetl/util/SchemaUtilSuite.scala | 48 ++++++++++++++++-- 4 files changed, 86 insertions(+), 22 deletions(-) diff --git a/project/Compilation.scala b/project/Compilation.scala index ab971072..53d8909e 100644 --- a/project/Compilation.scala +++ b/project/Compilation.scala @@ -124,6 +124,7 @@ object Compilation { ) val WartremoverTestFlags: Seq[Wart] = ExtraWartremoverFlags ++ Warts.allBut( + Wart.Any, Wart.NonUnitStatements, Wart.Null ) diff --git a/src/main/scala/com/exasol/cloudetl/data/ExaColumnInfo.scala b/src/main/scala/com/exasol/cloudetl/data/ExaColumnInfo.scala index f89610cc..f0ea6f0c 100644 --- a/src/main/scala/com/exasol/cloudetl/data/ExaColumnInfo.scala +++ b/src/main/scala/com/exasol/cloudetl/data/ExaColumnInfo.scala @@ -1,11 +1,12 @@ package com.exasol.cloudetl.data /** An Exasol table column information */ +@SuppressWarnings(Array("org.wartremover.warts.DefaultArguments")) final case class ExaColumnInfo( name: String, `type`: Class[_], - precision: Int, - scale: Int, - length: Int, - isNullable: Boolean + precision: Int = 0, + scale: Int = 0, + length: Int = 0, + isNullable: Boolean = true ) diff --git a/src/main/scala/com/exasol/cloudetl/util/SchemaUtil.scala b/src/main/scala/com/exasol/cloudetl/util/SchemaUtil.scala index 7315850b..c8bf9d74 100644 --- a/src/main/scala/com/exasol/cloudetl/util/SchemaUtil.scala +++ b/src/main/scala/com/exasol/cloudetl/util/SchemaUtil.scala @@ -1,5 +1,6 @@ package com.exasol.cloudetl.util +import com.exasol.ExaIterator import com.exasol.cloudetl.data.ExaColumnInfo import org.apache.parquet.schema.MessageType @@ -30,6 +31,20 @@ object SchemaUtil { new MessageType(schemaName, types: _*) } + // In below several lines, I try to pattern match on Class[X] of Java types. + // Please also read: + // https://stackoverflow.com/questions/7519140/pattern-matching-on-class-type + object JTypes { + val jInteger: Class[java.lang.Integer] = classOf[java.lang.Integer] + val jLong: Class[java.lang.Long] = classOf[java.lang.Long] + val jBigDecimal: Class[java.math.BigDecimal] = classOf[java.math.BigDecimal] + val jDouble: Class[java.lang.Double] = classOf[java.lang.Double] + val jBoolean: Class[java.lang.Boolean] = classOf[java.lang.Boolean] + val jString: Class[java.lang.String] = classOf[java.lang.String] + val jSqlDate: Class[java.sql.Date] = classOf[java.sql.Date] + val jSqlTimestamp: Class[java.sql.Timestamp] = classOf[java.sql.Timestamp] + } + /** * Given Exasol column [[com.exasol.cloudetl.data.ExaColumnInfo]] information convert it into * Parquet [[org.apache.parquet.schema.Type$]] @@ -39,19 +54,6 @@ object SchemaUtil { val colType = colInfo.`type` val repetition = if (colInfo.isNullable) Repetition.OPTIONAL else Repetition.REQUIRED - // In below several lines, I try to pattern match on Class[X] of Java types. - // Please also read: - // https://stackoverflow.com/questions/7519140/pattern-matching-on-class-type - object JTypes { - val jInteger: Class[java.lang.Integer] = classOf[java.lang.Integer] - val jLong: Class[java.lang.Long] = classOf[java.lang.Long] - val jBigDecimal: Class[java.math.BigDecimal] = classOf[java.math.BigDecimal] - val jDouble: Class[java.lang.Double] = classOf[java.lang.Double] - val jBoolean: Class[java.lang.Boolean] = classOf[java.lang.Boolean] - val jString: Class[java.lang.String] = classOf[java.lang.String] - val jSqlDate: Class[java.sql.Date] = classOf[java.sql.Date] - val jSqlTimestamp: Class[java.sql.Timestamp] = classOf[java.sql.Timestamp] - } import JTypes._ colType match { @@ -134,4 +136,26 @@ object SchemaUtil { } } + /** + * Returns a value from Exasol [[ExaIterator]] iterator on given index which have + * [[com.exasol.cloudetl.data.ExaColumnInfo]] column type + */ + def exaColumnToValue(iter: ExaIterator, idx: Int, colInfo: ExaColumnInfo): Any = { + val colType = colInfo.`type` + import JTypes._ + + colType match { + case `jInteger` => iter.getInteger(idx) + case `jLong` => iter.getLong(idx) + case `jBigDecimal` => iter.getBigDecimal(idx) + case `jDouble` => iter.getDouble(idx) + case `jString` => iter.getString(idx) + case `jBoolean` => iter.getBoolean(idx) + case `jSqlDate` => iter.getDate(idx) + case `jSqlTimestamp` => iter.getTimestamp(idx) + case _ => + throw new RuntimeException(s"Cannot get Exasol value for column type '$colType'.") + } + } + } diff --git a/src/test/scala/com/exasol/cloudetl/util/SchemaUtilSuite.scala b/src/test/scala/com/exasol/cloudetl/util/SchemaUtilSuite.scala index 19cca80b..90af5db9 100644 --- a/src/test/scala/com/exasol/cloudetl/util/SchemaUtilSuite.scala +++ b/src/test/scala/com/exasol/cloudetl/util/SchemaUtilSuite.scala @@ -1,17 +1,18 @@ package com.exasol.cloudetl.util +import com.exasol.ExaIterator import com.exasol.cloudetl.data.ExaColumnInfo -import org.apache.parquet.schema.MessageType -import org.apache.parquet.schema.OriginalType -import org.apache.parquet.schema.PrimitiveType +import org.apache.parquet.schema._ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.parquet.schema.Type.Repetition -import org.apache.parquet.schema.Types +import org.mockito.Mockito._ import org.scalatest.FunSuite import org.scalatest.Matchers +import org.scalatest.mockito.MockitoSugar -class SchemaUtilSuite extends FunSuite with Matchers { +@SuppressWarnings(Array("org.wartremover.contrib.warts.ExposedTuples")) +class SchemaUtilSuite extends FunSuite with Matchers with MockitoSugar { test("`createParquetMessageType` throws an exception for unknown type") { val thrown = intercept[RuntimeException] { @@ -93,4 +94,41 @@ class SchemaUtilSuite extends FunSuite with Matchers { assert(SchemaUtil.createParquetMessageType(exaColumns, schemaName) === messageType) } + + test("`exaColumnToValue` returns value with column type") { + val iter = mock[ExaIterator] + val startIdx = 3 + val bd = new java.math.BigDecimal(1337) + val dt = new java.sql.Date(System.currentTimeMillis()) + val ts = new java.sql.Timestamp(System.currentTimeMillis()) + + when(iter.getInteger(3)).thenReturn(1) + when(iter.getLong(4)).thenReturn(3L) + when(iter.getBigDecimal(5)).thenReturn(bd) + when(iter.getDouble(6)).thenReturn(3.14) + when(iter.getString(7)).thenReturn("xyz") + when(iter.getBoolean(8)).thenReturn(true) + when(iter.getDate(9)).thenReturn(dt) + when(iter.getTimestamp(10)).thenReturn(ts) + + val data = Seq( + 1 -> ExaColumnInfo("c_int", classOf[java.lang.Integer]), + 3L -> ExaColumnInfo("c_long", classOf[java.lang.Long]), + bd -> ExaColumnInfo("c_decimal", classOf[java.math.BigDecimal]), + 3.14 -> ExaColumnInfo("c_double", classOf[java.lang.Double]), + "xyz" -> ExaColumnInfo("c_string", classOf[java.lang.String]), + true -> ExaColumnInfo("c_boolean", classOf[java.lang.Boolean]), + dt -> ExaColumnInfo("c_date", classOf[java.sql.Date]), + ts -> ExaColumnInfo("c_timestamp", classOf[java.sql.Timestamp]) + ) + + data.zipWithIndex.map { + case ((expectedValue, col), idx) => + val nxtIdx = startIdx + idx + val ret = SchemaUtil.exaColumnToValue(iter, nxtIdx, col) + assert(ret === expectedValue) + assert(ret.getClass === col.`type`) + } + + } } From 81b119b3831d5258cd8fd5aee49717e9c0473b8a Mon Sep 17 00:00:00 2001 From: Muhammet Orazov Date: Fri, 1 Feb 2019 13:13:15 +0100 Subject: [PATCH 10/20] Restructure resource test data files --- .../import}/parquet/sales_positions1.snappy.parquet | Bin .../import}/parquet/sales_positions2.snappy.parquet | Bin .../parquet/sales_positions_small.snappy.parquet | Bin .../cloudetl/parquet/ParquetSourceSuite.scala | 2 +- .../exasol/cloudetl/scriptclasses/BaseSuite.scala | 5 +++-- .../cloudetl/scriptclasses/ImportFilesSuite.scala | 10 +++++----- .../scriptclasses/ImportMetadataSuite.scala | 11 +++++++---- 7 files changed, 16 insertions(+), 12 deletions(-) rename src/test/resources/{ => data/import}/parquet/sales_positions1.snappy.parquet (100%) rename src/test/resources/{ => data/import}/parquet/sales_positions2.snappy.parquet (100%) rename src/test/resources/{ => data/import}/parquet/sales_positions_small.snappy.parquet (100%) diff --git a/src/test/resources/parquet/sales_positions1.snappy.parquet b/src/test/resources/data/import/parquet/sales_positions1.snappy.parquet similarity index 100% rename from src/test/resources/parquet/sales_positions1.snappy.parquet rename to src/test/resources/data/import/parquet/sales_positions1.snappy.parquet diff --git a/src/test/resources/parquet/sales_positions2.snappy.parquet b/src/test/resources/data/import/parquet/sales_positions2.snappy.parquet similarity index 100% rename from src/test/resources/parquet/sales_positions2.snappy.parquet rename to src/test/resources/data/import/parquet/sales_positions2.snappy.parquet diff --git a/src/test/resources/parquet/sales_positions_small.snappy.parquet b/src/test/resources/data/import/parquet/sales_positions_small.snappy.parquet similarity index 100% rename from src/test/resources/parquet/sales_positions_small.snappy.parquet rename to src/test/resources/data/import/parquet/sales_positions_small.snappy.parquet diff --git a/src/test/scala/com/exasol/cloudetl/parquet/ParquetSourceSuite.scala b/src/test/scala/com/exasol/cloudetl/parquet/ParquetSourceSuite.scala index f23f2fb6..63856e6d 100644 --- a/src/test/scala/com/exasol/cloudetl/parquet/ParquetSourceSuite.scala +++ b/src/test/scala/com/exasol/cloudetl/parquet/ParquetSourceSuite.scala @@ -17,7 +17,7 @@ class ParquetSourceSuite extends FunSuite with Matchers { private val fs = FileSystem.get(conf) private val salesPosParquetFile = - Paths.get(getClass.getResource("/parquet/sales_positions1.snappy.parquet").toURI) + Paths.get(getClass.getResource("/data/import/parquet/sales_positions1.snappy.parquet").toURI) test("reads a single parquet file") { val data = ParquetSource(FsUtil.globWithLocal(salesPosParquetFile, fs), fs, conf) diff --git a/src/test/scala/com/exasol/cloudetl/scriptclasses/BaseSuite.scala b/src/test/scala/com/exasol/cloudetl/scriptclasses/BaseSuite.scala index e0356cf5..e319e75c 100644 --- a/src/test/scala/com/exasol/cloudetl/scriptclasses/BaseSuite.scala +++ b/src/test/scala/com/exasol/cloudetl/scriptclasses/BaseSuite.scala @@ -31,8 +31,9 @@ trait BaseSuite extends FunSuite with Matchers with MockitoSugar { s"""BUCKET_PATH:=:$s3BucketPath;S3_ENDPOINT:=:$s3Endpoint;""" + s"""S3_ACCESS_KEY:=:$s3AccessKey;S3_SECRET_KEY:=:$s3SecretKey""" - val resourcePath: String = norm(Paths.get(getClass.getResource("/parquet").toURI)) - val resourceBucket: String = s"$resourcePath/*.parquet" + val resourcePath: String = norm(Paths.get(getClass.getResource("/data").toURI)) + val resourceImportBucket: String = s"$resourcePath/import/parquet/*.parquet" + val resourceExportBucket: String = s"$resourcePath/export/parquet/" final def norm(path: Path): String = path.toUri.toString.replaceAll("/$", "").replaceAll("///", "/") diff --git a/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportFilesSuite.scala b/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportFilesSuite.scala index ef5ebe3b..56784747 100644 --- a/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportFilesSuite.scala +++ b/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportFilesSuite.scala @@ -9,10 +9,10 @@ import org.mockito.Mockito._ class ImportFilesSuite extends BaseSuite { test("`run` should emit total number of records") { - val file1 = s"$resourcePath/sales_positions1.snappy.parquet" - val file2 = s"$resourcePath/sales_positions2.snappy.parquet" + val file1 = s"$resourcePath/import/parquet/sales_positions1.snappy.parquet" + val file2 = s"$resourcePath/import/parquet/sales_positions2.snappy.parquet" - val exaIter = commonExaIterator(resourceBucket) + val exaIter = commonExaIterator(resourceImportBucket) when(exaIter.next()).thenReturn(true, false) when(exaIter.getString(2)).thenReturn(file1, file2) @@ -38,9 +38,9 @@ class ImportFilesSuite extends BaseSuite { * */ test("`run` should emit correct sequence of records") { - val file = s"$resourcePath/sales_positions_small.snappy.parquet" + val file = s"$resourcePath/import/parquet/sales_positions_small.snappy.parquet" - val exaIter = commonExaIterator(resourceBucket) + val exaIter = commonExaIterator(resourceImportBucket) when(exaIter.next()).thenReturn(false) when(exaIter.getString(2)).thenReturn(file) diff --git a/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportMetadataSuite.scala b/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportMetadataSuite.scala index 3586e821..35193c20 100644 --- a/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportMetadataSuite.scala +++ b/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportMetadataSuite.scala @@ -8,15 +8,18 @@ import org.mockito.Mockito._ class ImportMetadataSuite extends BaseSuite { test("`run` should create a list of files names") { - val exaIter = commonExaIterator(resourceBucket) + val exaIter = commonExaIterator(resourceImportBucket) when(exaIter.getInteger(2)).thenReturn(2) ImportMetadata.run(mock[ExaMetadata], exaIter) verify(exaIter, times(3)).emit(anyString(), anyString()) - verify(exaIter, times(1)).emit(s"$resourcePath/sales_positions1.snappy.parquet", "0") - verify(exaIter, times(1)).emit(s"$resourcePath/sales_positions2.snappy.parquet", "1") - verify(exaIter, times(1)).emit(s"$resourcePath/sales_positions_small.snappy.parquet", "0") + verify(exaIter, times(1)) + .emit(s"$resourcePath/import/parquet/sales_positions1.snappy.parquet", "0") + verify(exaIter, times(1)) + .emit(s"$resourcePath/import/parquet/sales_positions2.snappy.parquet", "1") + verify(exaIter, times(1)) + .emit(s"$resourcePath/import/parquet/sales_positions_small.snappy.parquet", "0") } } From 661e6f609e58b1e9cda487975cf196036c3da228 Mon Sep 17 00:00:00 2001 From: Muhammet Orazov Date: Fri, 1 Feb 2019 13:16:28 +0100 Subject: [PATCH 11/20] Add tests for schema conversion for unsupported type --- src/main/scala/com/exasol/cloudetl/util/SchemaUtil.scala | 6 ++++-- .../scala/com/exasol/cloudetl/util/SchemaUtilSuite.scala | 9 ++++++++- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/src/main/scala/com/exasol/cloudetl/util/SchemaUtil.scala b/src/main/scala/com/exasol/cloudetl/util/SchemaUtil.scala index c8bf9d74..3cc55bff 100644 --- a/src/main/scala/com/exasol/cloudetl/util/SchemaUtil.scala +++ b/src/main/scala/com/exasol/cloudetl/util/SchemaUtil.scala @@ -132,7 +132,9 @@ object SchemaUtil { .named(colName) case _ => - throw new RuntimeException(s"Cannot convert Exasol type '$colType' to Parquet type.") + throw new IllegalArgumentException( + s"Cannot convert Exasol type '$colType' to Parquet type." + ) } } @@ -154,7 +156,7 @@ object SchemaUtil { case `jSqlDate` => iter.getDate(idx) case `jSqlTimestamp` => iter.getTimestamp(idx) case _ => - throw new RuntimeException(s"Cannot get Exasol value for column type '$colType'.") + throw new IllegalArgumentException(s"Cannot get Exasol value for column type '$colType'.") } } diff --git a/src/test/scala/com/exasol/cloudetl/util/SchemaUtilSuite.scala b/src/test/scala/com/exasol/cloudetl/util/SchemaUtilSuite.scala index 90af5db9..a028ed39 100644 --- a/src/test/scala/com/exasol/cloudetl/util/SchemaUtilSuite.scala +++ b/src/test/scala/com/exasol/cloudetl/util/SchemaUtilSuite.scala @@ -15,7 +15,7 @@ import org.scalatest.mockito.MockitoSugar class SchemaUtilSuite extends FunSuite with Matchers with MockitoSugar { test("`createParquetMessageType` throws an exception for unknown type") { - val thrown = intercept[RuntimeException] { + val thrown = intercept[IllegalArgumentException] { SchemaUtil.createParquetMessageType( Seq(ExaColumnInfo("c_short", classOf[java.lang.Short], 0, 0, 0, false)), "test_schema" @@ -130,5 +130,12 @@ class SchemaUtilSuite extends FunSuite with Matchers with MockitoSugar { assert(ret.getClass === col.`type`) } + val thrown = intercept[IllegalArgumentException] { + SchemaUtil.exaColumnToValue(iter, 0, ExaColumnInfo("c_short", classOf[java.lang.Short])) + } + assert( + thrown.getMessage === "Cannot get Exasol value for column type 'class java.lang.Short'." + ) + } } From 3932d77f1e5ab7195356d27b87eafd6f7057e39b Mon Sep 17 00:00:00 2001 From: Muhammet Orazov Date: Fri, 1 Feb 2019 16:02:40 +0100 Subject: [PATCH 12/20] Add initial export functionality - Adds ParquetWriter - Adds RowWriteSupport However, there are still some more changes needed: - Decide on decimal to int32 or int64 based on precision - Improve the import functionality with date and timestamps --- .../cloudetl/parquet/ParquetRowWriter.scala | 41 +++ .../parquet/ParquetWriteOptions.scala | 35 +++ .../cloudetl/parquet/RowWriteSupport.scala | 245 ++++++++++++++++++ .../cloudetl/scriptclasses/ExportTable.scala | 98 +++++++ .../cloudetl/scriptclasses/BaseSuite.scala | 1 - .../scriptclasses/ExportTableSuite.scala | 146 +++++++++++ src/test/scala/org/mockito/ExtraMockito.scala | 28 ++ 7 files changed, 593 insertions(+), 1 deletion(-) create mode 100644 src/main/scala/com/exasol/cloudetl/parquet/ParquetRowWriter.scala create mode 100644 src/main/scala/com/exasol/cloudetl/parquet/ParquetWriteOptions.scala create mode 100644 src/main/scala/com/exasol/cloudetl/parquet/RowWriteSupport.scala create mode 100644 src/main/scala/com/exasol/cloudetl/scriptclasses/ExportTable.scala create mode 100644 src/test/scala/com/exasol/cloudetl/scriptclasses/ExportTableSuite.scala create mode 100644 src/test/scala/org/mockito/ExtraMockito.scala diff --git a/src/main/scala/com/exasol/cloudetl/parquet/ParquetRowWriter.scala b/src/main/scala/com/exasol/cloudetl/parquet/ParquetRowWriter.scala new file mode 100644 index 00000000..a07e72e4 --- /dev/null +++ b/src/main/scala/com/exasol/cloudetl/parquet/ParquetRowWriter.scala @@ -0,0 +1,41 @@ +package com.exasol.cloudetl.parquet + +import com.exasol.cloudetl.data.Row + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.parquet.column.ParquetProperties +import org.apache.parquet.hadoop.ParquetFileWriter +import org.apache.parquet.hadoop.ParquetWriter +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.schema.MessageType + +object ParquetRowWriter { + + private[this] class Builder(path: Path, messageType: MessageType) + extends ParquetWriter.Builder[Row, Builder](path) { + + override def getWriteSupport(conf: Configuration): WriteSupport[Row] = + new RowWriteSupport(messageType) + + override def self(): Builder = this + } + + def apply( + path: Path, + conf: Configuration, + messageType: MessageType, + options: ParquetWriteOptions + ): ParquetWriter[Row] = + new Builder(path, messageType) + .withRowGroupSize(options.blockSize) + .withPageSize(options.pageSize) + .withCompressionCodec(options.compressionCodec) + .withDictionaryEncoding(options.enableDictionaryEncoding) + .withValidation(options.enableValidation) + .withWriteMode(ParquetFileWriter.Mode.CREATE) + .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0) + .withConf(conf) + .build() + +} diff --git a/src/main/scala/com/exasol/cloudetl/parquet/ParquetWriteOptions.scala b/src/main/scala/com/exasol/cloudetl/parquet/ParquetWriteOptions.scala new file mode 100644 index 00000000..9ad6db03 --- /dev/null +++ b/src/main/scala/com/exasol/cloudetl/parquet/ParquetWriteOptions.scala @@ -0,0 +1,35 @@ +package com.exasol.cloudetl.parquet + +import org.apache.parquet.hadoop.ParquetWriter +import org.apache.parquet.hadoop.metadata.CompressionCodecName + +final case class ParquetWriteOptions( + blockSize: Int, + pageSize: Int, + compressionCodec: CompressionCodecName, + enableDictionaryEncoding: Boolean, + enableValidation: Boolean +) + +object ParquetWriteOptions { + + @SuppressWarnings( + Array("org.wartremover.warts.Overloading", "org.danielnixon.extrawarts.StringOpsPartial") + ) + def apply(params: Map[String, String]): ParquetWriteOptions = { + val compressionCodec = params.getOrElse("PARQUET_COMPRESSION_CODEC", "").toUpperCase() match { + case "SNAPPY" => CompressionCodecName.SNAPPY + case "GZIP" => CompressionCodecName.GZIP + case "LZO" => CompressionCodecName.LZO + case _ => CompressionCodecName.UNCOMPRESSED + } + val blockSize = + params.get("PARQUET_BLOCK_SIZE").fold(ParquetWriter.DEFAULT_BLOCK_SIZE)(_.toInt) + val pageSize = params.get("PARQUET_PAGE_SIZE").fold(ParquetWriter.DEFAULT_PAGE_SIZE)(_.toInt) + val dictionary = params.get("PARQUET_DICTIONARY_ENCODING").fold(true)(_.toBoolean) + val validation = params.get("PARQUET_VALIDAIONT").fold(true)(_.toBoolean) + + ParquetWriteOptions(blockSize, pageSize, compressionCodec, dictionary, validation) + } + +} diff --git a/src/main/scala/com/exasol/cloudetl/parquet/RowWriteSupport.scala b/src/main/scala/com/exasol/cloudetl/parquet/RowWriteSupport.scala new file mode 100644 index 00000000..ba13979b --- /dev/null +++ b/src/main/scala/com/exasol/cloudetl/parquet/RowWriteSupport.scala @@ -0,0 +1,245 @@ +package com.exasol.cloudetl.parquet + +import java.nio.ByteBuffer +import java.nio.ByteOrder + +import scala.collection.JavaConverters._ + +import com.exasol.cloudetl.data.Row +import com.exasol.cloudetl.util.SchemaUtil + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.hadoop.api.WriteSupport.FinalizedWriteContext +import org.apache.parquet.io.api.Binary +import org.apache.parquet.io.api.RecordConsumer +import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.OriginalType +import org.apache.parquet.schema.PrimitiveType +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName + +@SuppressWarnings( + Array( + "org.wartremover.warts.AsInstanceOf", + "org.wartremover.warts.Null", + "org.wartremover.warts.Var" + ) +) +class RowWriteSupport(schema: MessageType) extends WriteSupport[Row] { + + private final val DECIMAL_MAX_PRECISION: Int = 38 + // private final val DECIMAL_MAX_INT_DIGITS: Int = 9 + // private final val DECIMAL_MAX_LONG_DIGITS: Int = 18 + private final val TIMESTAMP_MAX_BYTE_SIZE: Int = 12 + + private type RowValueWriter = (Row, Int) => Unit + + private var rootFieldWriters: Array[RowValueWriter] = _ + + private var recordConsumer: RecordConsumer = _ + + // Reusable byte array used to write timestamps as Parquet INT96 values + private val timestampBuffer = new Array[Byte](TIMESTAMP_MAX_BYTE_SIZE) + + // Reusable byte array used to write decimal values as Parquet FIXED_LEN_BYTE_ARRAY values + private val decimalBuffer = + new Array[Byte](SchemaUtil.PRECISION_TO_BYTE_SIZE(DECIMAL_MAX_PRECISION - 1)) + + final override def init(configuration: Configuration): WriteSupport.WriteContext = { + this.rootFieldWriters = schema.getFields.asScala + .map { + case field => + makeWriter(field.asInstanceOf[PrimitiveType]) + } + .toArray[RowValueWriter] + + new WriteSupport.WriteContext(schema, new java.util.HashMap()) + } + + final override def prepareForWrite(record: RecordConsumer): Unit = + this.recordConsumer = record + + final override def write(row: Row): Unit = + consumeMessage { + writeFields(row, schema, rootFieldWriters) + } + + final override def finalizeWrite(): FinalizedWriteContext = + new FinalizedWriteContext(new java.util.HashMap()) + + private def writeFields(row: Row, schema: MessageType, writers: Array[RowValueWriter]): Unit = { + var idx = 0 + while (idx < schema.getFieldCount) { + val fieldType = schema.getType(idx) + val fieldName = fieldType.getName() + if (row.values(idx) != null) { + consumeField(fieldName, idx) { + writers(idx).apply(row, idx) + } + } + idx += 1 + } + } + + private def consumeMessage(fn: => Unit): Unit = { + recordConsumer.startMessage() + fn + recordConsumer.endMessage() + } + + private def consumeField(field: String, index: Int)(fn: => Unit): Unit = { + recordConsumer.startField(field, index) + fn + recordConsumer.endField(field, index) + } + + private def makeWriter(primitiveType: PrimitiveType): RowValueWriter = { + val typeName = primitiveType.getPrimitiveTypeName + val originalType = primitiveType.getOriginalType + + typeName match { + case PrimitiveTypeName.BOOLEAN => + (row: Row, index: Int) => + recordConsumer.addBoolean(row.values(index).asInstanceOf[Boolean]) + + case PrimitiveTypeName.INT32 => + originalType match { + case OriginalType.DATE => + makeDateWriter() + case _ => + (row: Row, index: Int) => + recordConsumer.addInteger(row.values(index).asInstanceOf[Integer]) + } + + case PrimitiveTypeName.INT64 => + (row: Row, index: Int) => + recordConsumer.addLong(row.values(index).asInstanceOf[Long]) + + case PrimitiveTypeName.FLOAT => + (row: Row, index: Int) => + recordConsumer.addFloat(row.values(index).asInstanceOf[Double].floatValue) + + case PrimitiveTypeName.DOUBLE => + (row: Row, index: Int) => + recordConsumer.addDouble(row.values(index).asInstanceOf[Double]) + + case PrimitiveTypeName.BINARY => + (row: Row, index: Int) => + recordConsumer.addBinary( + Binary.fromReusedByteArray(row.values(index).asInstanceOf[String].getBytes) + ) + + case PrimitiveTypeName.INT96 => + makeTimestampWriter() + + case PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY if originalType == OriginalType.DECIMAL => + val decimalMetadata = primitiveType.getDecimalMetadata + makeDecimalWriter(decimalMetadata.getPrecision, decimalMetadata.getScale) + + case _ => throw new UnsupportedOperationException(s"Unsupported parquet type '$typeName'.") + } + } + + private def makeDateWriter(): RowValueWriter = (row: Row, index: Int) => { + import java.time._ + import java.time.temporal.ChronoUnit + + val UnixEpoch = LocalDate.of(1970, 1, 1) // scalastyle:ignore magic.number + + // Write the number of days since unix epoch as integer + val date = row.values(index).asInstanceOf[java.sql.Date] + val localDate = Instant.ofEpochMilli(date.getTime).atZone(ZoneId.of("UTC")).toLocalDate + val days = ChronoUnit.DAYS.between(UnixEpoch, localDate) + + recordConsumer.addInteger(days.toInt) + } + + private def makeTimestampWriter(): RowValueWriter = (row: Row, index: Int) => { + import java.time._ + import java.time.temporal.ChronoUnit + + val JulianEpochInGregorian = + LocalDateTime.of(-4713, 11, 24, 0, 0, 0) // scalastyle:ignore magic.number + + val timestamp = row.values(index).asInstanceOf[java.sql.Timestamp] + val dt = Instant.ofEpochMilli(timestamp.getTime).atZone(ZoneId.of("UTC")) + val days = ChronoUnit.DAYS.between(JulianEpochInGregorian, dt).toInt + val nanos = timestamp.getNanos + ChronoUnit.NANOS + .between(dt.toLocalDate.atStartOfDay(ZoneId.of("UTC")).toLocalTime, dt.toLocalTime) + + val buf = ByteBuffer.wrap(timestampBuffer) + val _ = buf.order(ByteOrder.LITTLE_ENDIAN).putLong(nanos).putInt(days) + + recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer)) + } + + private def makeDecimalWriter(precision: Int, scale: Int): RowValueWriter = { + require( + precision >= 1, + s"Decimal precision $precision should not be less than minimum precision 1" + ) + require( + precision <= DECIMAL_MAX_PRECISION, + s"Decimal precision $precision should not exceed max precision $DECIMAL_MAX_PRECISION" + ) + + // The number of bytes from given the precision + val numBytes = SchemaUtil.PRECISION_TO_BYTE_SIZE(precision - 1) + + // val int32Writer = (row: Row, index: Int) => { + // val bigDecimalInt = + // row + // .values(index) + // .asInstanceOf[java.math.BigDecimal] + // .unscaledValue() + // .longValueExact() + // .toInt + // recordConsumer.addInteger(bigDecimalInt) + // } + + // val int64Writer = (row: Row, index: Int) => { + // val bigDecimalLong = + // row.values(index).asInstanceOf[java.math.BigDecimal].unscaledValue().longValueExact() + // recordConsumer.addLong(bigDecimalLong) + // } + + val bytesWriter = (row: Row, index: Int) => { + val decimal = row.values(index).asInstanceOf[java.math.BigDecimal] + val unscaled = decimal.unscaledValue() + val bytes = unscaled.toByteArray + val fixedLenBytesArray = + if (bytes.length == numBytes) { + // If the length of the underlying byte array of the unscaled `BigDecimal` happens to be + // `numBytes`, just reuse it, so that we don't bother copying it to `decimalBuffer`. + bytes + } else if (bytes.length < numBytes) { + // Otherwise, the length must be less than `numBytes`. In this case we copy contents of + // the underlying bytes with padding sign bytes to `decimalBuffer` to form the result + // fixed-length byte array. + + // For negatives all high bits need to be 1 hence -1 used + val signByte = if (unscaled.signum < 0) -1: Byte else 0: Byte + java.util.Arrays.fill(decimalBuffer, 0, numBytes - bytes.length, signByte) + System.arraycopy(bytes, 0, decimalBuffer, numBytes - bytes.length, bytes.length) + decimalBuffer + } else { + throw new IllegalStateException( + s"The precision $precision is too small for decimal value." + ) + } + + recordConsumer.addBinary(Binary.fromReusedByteArray(fixedLenBytesArray, 0, numBytes)) + } + + // if (precision <= DECIMAL_MAX_INT_DIGITS) { // 1 <= precision <= 9, writes as INT32 + // int32Writer + // } else if (precision <= DECIMAL_MAX_LONG_DIGITS) { // 10 <= precision <= 18, writes as INT64 + // int64Writer + // } else { // 19 <= precision <= 38, writes as FIXED_LEN_BYTE_ARRAY + // bytesWriter + // } + + bytesWriter + } + +} diff --git a/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportTable.scala b/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportTable.scala new file mode 100644 index 00000000..9bf9687b --- /dev/null +++ b/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportTable.scala @@ -0,0 +1,98 @@ +package com.exasol.cloudetl.scriptclasses + +import java.util.UUID + +import scala.collection.mutable.ListBuffer + +import com.exasol.ExaIterator +import com.exasol.ExaMetadata +import com.exasol.cloudetl.bucket.Bucket +import com.exasol.cloudetl.data.ExaColumnInfo +import com.exasol.cloudetl.data.Row +import com.exasol.cloudetl.parquet.ParquetRowWriter +import com.exasol.cloudetl.parquet.ParquetWriteOptions +import com.exasol.cloudetl.util.SchemaUtil + +import com.typesafe.scalalogging.LazyLogging +import org.apache.hadoop.fs.Path + +object ExportTable extends LazyLogging { + + def run(meta: ExaMetadata, iter: ExaIterator): Unit = { + val bucketPath = iter.getString(0) + val params = Bucket.strToMap(iter.getString(1)) + val bucket = Bucket(params) + + val srcColumnNames = iter.getString(2).split("\\.") + val firstColumnIdx = 3 + val columns = getColumns(meta, srcColumnNames, firstColumnIdx) + + val parquetFilename = generateParquetFilename(meta) + val path = new Path(bucketPath, parquetFilename) + val messageType = SchemaUtil.createParquetMessageType(columns, "exasol_export_schema") + val options = ParquetWriteOptions(params) + val writer = ParquetRowWriter(path, bucket.createConfiguration(), messageType, options) + + logger.info(s"Starting export from node = '${meta.getNodeId}' and vm = '${meta.getVmId}'") + + do { + val row = getRow(iter, firstColumnIdx, columns) + logger.debug(s"Writing row '$row'") + writer.write(row) + } while (iter.next()) + + writer.close() + logger.info(s"Finished exporting from node = '${meta.getNodeId}' and vm = '${meta.getVmId}'") + } + + private[this] def generateParquetFilename(meta: ExaMetadata): String = { + val nodeId = meta.getNodeId + val vmId = meta.getVmId + val uuidStr = UUID.randomUUID.toString.replaceAll("-", "") + s"exa_export_${nodeId}_${vmId}_$uuidStr.parquet" + } + + private[this] def getRow(iter: ExaIterator, startIdx: Int, columns: Seq[ExaColumnInfo]): Row = { + val vals = columns.zipWithIndex.map { + case (col, idx) => + SchemaUtil.exaColumnToValue(iter, startIdx + idx, col) + } + Row(values = vals) + } + + /** + * Creates a sequence of [[ExaColumnInfo]] columns using an Exasol [[ExaMetadata]] input column + * methods. + * + * Set the name of the column using `srcColumnNames` parameter. Additionally, set the precision, + * scale and length using corresponding functions on Exasol metadata for input columns. + * + * @param meta An Exasol [[ExaMetadata]] metadata + * @param srcColumnNames A sequence of column names per each input column in metadata + * @param startIdx A starting integer index to reference input column + * @return A sequence of [[ExaColumnInfo]] columns + */ + @SuppressWarnings(Array("org.wartremover.warts.MutableDataStructures")) + private[this] def getColumns( + meta: ExaMetadata, + srcColumnNames: Seq[String], + startIdx: Int + ): Seq[ExaColumnInfo] = { + val totalColumnCnt = meta.getInputColumnCount.toInt + val columns = ListBuffer[ExaColumnInfo]() + + for { idx <- startIdx until totalColumnCnt } columns.append( + ExaColumnInfo( + name = srcColumnNames(idx - startIdx), + `type` = meta.getInputColumnType(idx), + precision = meta.getInputColumnPrecision(idx).toInt, + scale = meta.getInputColumnScale(idx).toInt, + length = meta.getInputColumnLength(idx).toInt, + isNullable = true + ) + ) + + columns.toSeq + } + +} diff --git a/src/test/scala/com/exasol/cloudetl/scriptclasses/BaseSuite.scala b/src/test/scala/com/exasol/cloudetl/scriptclasses/BaseSuite.scala index e319e75c..b5f058e9 100644 --- a/src/test/scala/com/exasol/cloudetl/scriptclasses/BaseSuite.scala +++ b/src/test/scala/com/exasol/cloudetl/scriptclasses/BaseSuite.scala @@ -33,7 +33,6 @@ trait BaseSuite extends FunSuite with Matchers with MockitoSugar { val resourcePath: String = norm(Paths.get(getClass.getResource("/data").toURI)) val resourceImportBucket: String = s"$resourcePath/import/parquet/*.parquet" - val resourceExportBucket: String = s"$resourcePath/export/parquet/" final def norm(path: Path): String = path.toUri.toString.replaceAll("/$", "").replaceAll("///", "/") diff --git a/src/test/scala/com/exasol/cloudetl/scriptclasses/ExportTableSuite.scala b/src/test/scala/com/exasol/cloudetl/scriptclasses/ExportTableSuite.scala new file mode 100644 index 00000000..19d2ced1 --- /dev/null +++ b/src/test/scala/com/exasol/cloudetl/scriptclasses/ExportTableSuite.scala @@ -0,0 +1,146 @@ +package com.exasol.cloudetl.scriptclasses + +import java.io.IOException +import java.nio.file._ +import java.nio.file.attribute.BasicFileAttributes + +import com.exasol.ExaIterator +import com.exasol.ExaMetadata +import com.exasol.cloudetl.parquet.ParquetSource +import com.exasol.cloudetl.util.FsUtil + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FileSystem +import org.mockito.ExtraMockito +import org.mockito.Mockito._ + +@SuppressWarnings(Array("org.wartremover.warts.JavaSerializable")) +class ExportTableSuite extends BaseSuite { + + val srcColumns: Seq[String] = Seq( + "c_int", + "c_long", + "c_decimal", + "c_double", + "c_string", + "c_boolean", + "c_date", + "c_timestamp" + ) + + final def createMockedIter(resourceDir: String): ExaIterator = { + val mockedIter = commonExaIterator(resourceDir) + when(mockedIter.getString(2)).thenReturn(srcColumns.mkString(".")) + when(mockedIter.next()).thenReturn(true, false) + + val bd1 = new java.math.BigDecimal(1337) + val bd2 = new java.math.BigDecimal(8888) + val dt1 = new java.sql.Date(System.currentTimeMillis()) + val dt2 = new java.sql.Date(System.currentTimeMillis()) + val ts1 = new java.sql.Timestamp(System.currentTimeMillis()) + val ts2 = new java.sql.Timestamp(System.currentTimeMillis()) + + when(mockedIter.getInteger(3)).thenReturn(1, 2) + when(mockedIter.getLong(4)).thenReturn(3L, 4L) + when(mockedIter.getBigDecimal(5)).thenReturn(bd1, bd2) + when(mockedIter.getDouble(6)).thenReturn(3.14, 0.13) + when(mockedIter.getString(7)).thenReturn("xyz", "abc") + when(mockedIter.getBoolean(8)).thenReturn(true, false) + when(mockedIter.getDate(9)).thenReturn(dt1, dt2) + when(mockedIter.getTimestamp(10)).thenReturn(ts1, ts2) + + mockedIter + } + + final def createMockedMeta(): ExaMetadata = { + val mockedMeta = mock[ExaMetadata] + when(mockedMeta.getInputColumnCount()).thenReturn(11L) + val returns = Seq( + (3, classOf[java.lang.Integer], 0L, 0L, 0L), + (4, classOf[java.lang.Long], 0L, 0L, 0L), + (5, classOf[java.math.BigDecimal], 9L, 2L, 0L), + (6, classOf[java.lang.Double], 0L, 0L, 0L), + (7, classOf[java.lang.String], 0L, 0L, 3L), + (8, classOf[java.lang.Boolean], 0L, 0L, 0L), + (9, classOf[java.sql.Date], 0L, 0L, 0L), + (10, classOf[java.sql.Timestamp], 0L, 0L, 0L) + ) + returns.foreach { + case (idx, cls, prec, scale, len) => + ExtraMockito.doReturn(cls).when(mockedMeta).getInputColumnType(idx) + when(mockedMeta.getInputColumnPrecision(idx)).thenReturn(prec) + when(mockedMeta.getInputColumnScale(idx)).thenReturn(scale) + when(mockedMeta.getInputColumnLength(idx)).thenReturn(len) + } + + mockedMeta + } + + test("`run` should export the Exasol rows from ExaIterator") { + val tempDir = Files.createTempDirectory("exportTableTest") + + val meta = createMockedMeta() + val iter = createMockedIter(tempDir.toUri.toString) + + ExportTable.run(meta, iter) + + verify(meta, times(1)).getInputColumnCount + for { idx <- 3 to 10 } { + verify(meta, times(1)).getInputColumnType(idx) + verify(meta, times(1)).getInputColumnPrecision(idx) + verify(meta, times(1)).getInputColumnScale(idx) + verify(meta, times(1)).getInputColumnLength(idx) + } + + verify(iter, times(2)).getInteger(3) + verify(iter, times(2)).getLong(4) + verify(iter, times(2)).getBigDecimal(5) + verify(iter, times(2)).getDouble(6) + verify(iter, times(2)).getString(7) + verify(iter, times(2)).getBoolean(8) + verify(iter, times(2)).getDate(9) + verify(iter, times(2)).getTimestamp(10) + + deleteFiles(tempDir) + } + + test("read exported rows from a file") { + val tempDir = Files.createTempDirectory("exportTableTest") + val meta = createMockedMeta() + val iter = createMockedIter(tempDir.toUri.toString) + + ExportTable.run(meta, iter) + + val conf = new Configuration() + val fs = FileSystem.get(conf) + + val data = ParquetSource(FsUtil.globWithLocal(tempDir, fs), fs, conf) + val iters = data.stream + + iters.foreach { iter => + iter.foreach { row => + println(s"ROW = $row") + } + } + + deleteFiles(tempDir) + } + + final def deleteFiles(dir: Path): Unit = { + Files.walkFileTree( + dir, + new SimpleFileVisitor[Path] { + override def visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult = { + Files.delete(file) + FileVisitResult.CONTINUE + } + override def postVisitDirectory(dir: Path, exc: IOException): FileVisitResult = { + Files.delete(dir) + FileVisitResult.CONTINUE + } + } + ) + () + } + +} diff --git a/src/test/scala/org/mockito/ExtraMockito.scala b/src/test/scala/org/mockito/ExtraMockito.scala new file mode 100644 index 00000000..1be7fd22 --- /dev/null +++ b/src/test/scala/org/mockito/ExtraMockito.scala @@ -0,0 +1,28 @@ +package org.mockito + +import org.mockito.stubbing.Stubber + +/** Extra helper functions for mockito mocking */ +object ExtraMockito { + + /** + * Delegates the call to Mockito.doReturn(toBeReturned, toBeReturnedNext) but fixes + * the following compiler issue that happens because the overloaded vararg on the Java side + * + * {{{ + * Error:(33, 25) ambiguous reference to overloaded definition, both method doReturn in class + * Mockito of type (x$1: Any, x$2: Object*)org.mockito.stubbing.Stubber and method doReturn + * in class Mockito of type (x$1: Any)org.mockito.stubbing.Stubber match argument types + * (`Type`) + * }}} + * + * This is adapted from mockito-scala project, + * - mockito-scala/blob/master/core/src/main/scala/org/mockito/MockitoAPI.scala#L59 + */ + def doReturn[T](toBeReturned: T, toBeReturnedNext: T*): Stubber = + Mockito.doReturn( + toBeReturned, + toBeReturnedNext.map(_.asInstanceOf[Object]): _* + ) + +} From e9e1dd8dff8844cd211abd4479f468baf8310362 Mon Sep 17 00:00:00 2001 From: Muhammet Orazov Date: Mon, 4 Feb 2019 14:59:43 +0100 Subject: [PATCH 13/20] Converts decimal values into int32 or int64 if precision is within bounds --- .../cloudetl/parquet/RowWriteSupport.scala | 78 +++++++++++-------- .../com/exasol/cloudetl/util/SchemaUtil.scala | 76 +++++++++++++----- .../cloudetl/util/SchemaUtilSuite.scala | 47 +++++++++-- 3 files changed, 145 insertions(+), 56 deletions(-) diff --git a/src/main/scala/com/exasol/cloudetl/parquet/RowWriteSupport.scala b/src/main/scala/com/exasol/cloudetl/parquet/RowWriteSupport.scala index ba13979b..fef4ae75 100644 --- a/src/main/scala/com/exasol/cloudetl/parquet/RowWriteSupport.scala +++ b/src/main/scala/com/exasol/cloudetl/parquet/RowWriteSupport.scala @@ -18,6 +18,14 @@ import org.apache.parquet.schema.OriginalType import org.apache.parquet.schema.PrimitiveType import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName +/** + * A Parquet [[org.apache.parquet.hadoop.api.WriteSupport]] implementation that writes + * [[com.exasol.cloudetl.data.Row]] as a Parquet data. + * + * This is mostly adapted from Spark codebase: + * - org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport + * + */ @SuppressWarnings( Array( "org.wartremover.warts.AsInstanceOf", @@ -27,15 +35,17 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName ) class RowWriteSupport(schema: MessageType) extends WriteSupport[Row] { - private final val DECIMAL_MAX_PRECISION: Int = 38 - // private final val DECIMAL_MAX_INT_DIGITS: Int = 9 - // private final val DECIMAL_MAX_LONG_DIGITS: Int = 18 + // The number bytes required for timestamp buffer in Parquet private final val TIMESTAMP_MAX_BYTE_SIZE: Int = 12 + // This is a type that is responsible for writing a value in Row values index to the + // RecordConsumer private type RowValueWriter = (Row, Int) => Unit + // A list of `RowValueWriter`-s for each field type of Parquet `schema` private var rootFieldWriters: Array[RowValueWriter] = _ + // A Parquet RecordConsumer that all values of a Row will be written private var recordConsumer: RecordConsumer = _ // Reusable byte array used to write timestamps as Parquet INT96 values @@ -43,7 +53,7 @@ class RowWriteSupport(schema: MessageType) extends WriteSupport[Row] { // Reusable byte array used to write decimal values as Parquet FIXED_LEN_BYTE_ARRAY values private val decimalBuffer = - new Array[Byte](SchemaUtil.PRECISION_TO_BYTE_SIZE(DECIMAL_MAX_PRECISION - 1)) + new Array[Byte](SchemaUtil.PRECISION_TO_BYTE_SIZE(SchemaUtil.DECIMAL_MAX_PRECISION - 1)) final override def init(configuration: Configuration): WriteSupport.WriteContext = { this.rootFieldWriters = schema.getFields.asScala @@ -106,6 +116,9 @@ class RowWriteSupport(schema: MessageType) extends WriteSupport[Row] { originalType match { case OriginalType.DATE => makeDateWriter() + case OriginalType.DECIMAL => + val decimalMetadata = primitiveType.getDecimalMetadata + makeDecimalWriter(decimalMetadata.getPrecision, decimalMetadata.getScale) case _ => (row: Row, index: Int) => recordConsumer.addInteger(row.values(index).asInstanceOf[Integer]) @@ -179,29 +192,31 @@ class RowWriteSupport(schema: MessageType) extends WriteSupport[Row] { s"Decimal precision $precision should not be less than minimum precision 1" ) require( - precision <= DECIMAL_MAX_PRECISION, - s"Decimal precision $precision should not exceed max precision $DECIMAL_MAX_PRECISION" + precision <= SchemaUtil.DECIMAL_MAX_PRECISION, + s"""|Decimal precision $precision should not exceed + |max precision ${SchemaUtil.DECIMAL_MAX_PRECISION} + """.stripMargin ) // The number of bytes from given the precision val numBytes = SchemaUtil.PRECISION_TO_BYTE_SIZE(precision - 1) - // val int32Writer = (row: Row, index: Int) => { - // val bigDecimalInt = - // row - // .values(index) - // .asInstanceOf[java.math.BigDecimal] - // .unscaledValue() - // .longValueExact() - // .toInt - // recordConsumer.addInteger(bigDecimalInt) - // } - - // val int64Writer = (row: Row, index: Int) => { - // val bigDecimalLong = - // row.values(index).asInstanceOf[java.math.BigDecimal].unscaledValue().longValueExact() - // recordConsumer.addLong(bigDecimalLong) - // } + val int32Writer = (row: Row, index: Int) => { + val bigDecimalInt = + row + .values(index) + .asInstanceOf[java.math.BigDecimal] + .unscaledValue() + .longValueExact() + .toInt + recordConsumer.addInteger(bigDecimalInt) + } + + val int64Writer = (row: Row, index: Int) => { + val bigDecimalLong = + row.values(index).asInstanceOf[java.math.BigDecimal].unscaledValue().longValueExact() + recordConsumer.addLong(bigDecimalLong) + } val bytesWriter = (row: Row, index: Int) => { val decimal = row.values(index).asInstanceOf[java.math.BigDecimal] @@ -231,15 +246,16 @@ class RowWriteSupport(schema: MessageType) extends WriteSupport[Row] { recordConsumer.addBinary(Binary.fromReusedByteArray(fixedLenBytesArray, 0, numBytes)) } - // if (precision <= DECIMAL_MAX_INT_DIGITS) { // 1 <= precision <= 9, writes as INT32 - // int32Writer - // } else if (precision <= DECIMAL_MAX_LONG_DIGITS) { // 10 <= precision <= 18, writes as INT64 - // int64Writer - // } else { // 19 <= precision <= 38, writes as FIXED_LEN_BYTE_ARRAY - // bytesWriter - // } - - bytesWriter + if (precision <= SchemaUtil.DECIMAL_MAX_INT_DIGITS) { + // 1 <= precision <= 9, writes as INT32 + int32Writer + } else if (precision <= SchemaUtil.DECIMAL_MAX_LONG_DIGITS) { + // 10 <= precision <= 18, writes as INT64 + int64Writer + } else { + // 19 <= precision <= 38, writes as FIXED_LEN_BYTE_ARRAY + bytesWriter + } } } diff --git a/src/main/scala/com/exasol/cloudetl/util/SchemaUtil.scala b/src/main/scala/com/exasol/cloudetl/util/SchemaUtil.scala index 3cc55bff..7c489f05 100644 --- a/src/main/scala/com/exasol/cloudetl/util/SchemaUtil.scala +++ b/src/main/scala/com/exasol/cloudetl/util/SchemaUtil.scala @@ -12,6 +12,10 @@ import org.apache.parquet.schema.Types object SchemaUtil { + val DECIMAL_MAX_PRECISION: Int = 38 + val DECIMAL_MAX_INT_DIGITS: Int = 9 + val DECIMAL_MAX_LONG_DIGITS: Int = 18 + // Maps the precision value into the number of bytes // Adapted from: // - org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe.java @@ -54,47 +58,81 @@ object SchemaUtil { val colType = colInfo.`type` val repetition = if (colInfo.isNullable) Repetition.OPTIONAL else Repetition.REQUIRED + // Given a numeric type (int, long, bigDecimal) with precision more than zero, encodes it as + // Parquet INT32, INT64 or FIXED_LEN_BYTE_ARRAY type. + // + // - for 1 <= precision <= 9, use INT32 + // - for 1 <= precision <= 18, use INT64 + // - otherwise, use FIXED_LEN_BYTE_ARRAY + def makeDecimalType(precision: Int): Type = { + require( + precision > 0, + s"""|The precision should be larger than zero for type '$colType' in order to encode + |it as numeric (int32, int64, fixed_len_array) type. + """.stripMargin + ) + if (precision <= DECIMAL_MAX_INT_DIGITS) { + Types + .primitive(PrimitiveTypeName.INT32, repetition) + .precision(precision) + .scale(colInfo.scale) + .as(OriginalType.DECIMAL) + .named(colName) + } else if (precision <= DECIMAL_MAX_LONG_DIGITS) { + Types + .primitive(PrimitiveTypeName.INT64, repetition) + .precision(precision) + .scale(colInfo.scale) + .as(OriginalType.DECIMAL) + .named(colName) + } else { + Types + .primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, repetition) + .precision(precision) + .scale(colInfo.scale) + .length(PRECISION_TO_BYTE_SIZE(precision - 1)) + .as(OriginalType.DECIMAL) + .named(colName) + } + } + import JTypes._ colType match { case `jInteger` => - if (colInfo.precision > 0) { + if (colInfo.precision == 0) { + Types + .primitive(PrimitiveTypeName.INT32, repetition) + .named(colName) + } else if (colInfo.precision <= DECIMAL_MAX_INT_DIGITS) { Types - .primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, repetition) + .primitive(PrimitiveTypeName.INT32, repetition) .precision(colInfo.precision) .scale(colInfo.scale) - .length(PRECISION_TO_BYTE_SIZE(colInfo.precision - 1)) .as(OriginalType.DECIMAL) .named(colName) } else { - Types - .primitive(PrimitiveTypeName.INT32, repetition) - .named(colName) + makeDecimalType(colInfo.precision) } case `jLong` => - if (colInfo.precision > 0) { + if (colInfo.precision == 0) { + Types + .primitive(PrimitiveTypeName.INT64, repetition) + .named(colName) + } else if (colInfo.precision <= DECIMAL_MAX_LONG_DIGITS) { Types - .primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, repetition) + .primitive(PrimitiveTypeName.INT64, repetition) .precision(colInfo.precision) .scale(colInfo.scale) - .length(PRECISION_TO_BYTE_SIZE(colInfo.precision - 1)) .as(OriginalType.DECIMAL) .named(colName) } else { - Types - .primitive(PrimitiveTypeName.INT64, repetition) - .named(colName) + makeDecimalType(colInfo.precision) } case `jBigDecimal` => - Types - .primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, repetition) - .precision(colInfo.precision) - .scale(colInfo.scale) - .length(PRECISION_TO_BYTE_SIZE(colInfo.precision - 1)) - .as(OriginalType.DECIMAL) - .named(colName) + makeDecimalType(colInfo.precision) case `jDouble` => Types diff --git a/src/test/scala/com/exasol/cloudetl/util/SchemaUtilSuite.scala b/src/test/scala/com/exasol/cloudetl/util/SchemaUtilSuite.scala index a028ed39..7de2cc41 100644 --- a/src/test/scala/com/exasol/cloudetl/util/SchemaUtilSuite.scala +++ b/src/test/scala/com/exasol/cloudetl/util/SchemaUtilSuite.scala @@ -29,9 +29,14 @@ class SchemaUtilSuite extends FunSuite with Matchers with MockitoSugar { val exaColumns = Seq( ExaColumnInfo("c_int", classOf[java.lang.Integer], 0, 0, 0, true), - ExaColumnInfo("c_int", classOf[java.lang.Integer], 1, 0, 1, true), + ExaColumnInfo("c_int", classOf[java.lang.Integer], 1, 0, 0, true), + ExaColumnInfo("c_int", classOf[java.lang.Integer], 10, 0, 0, true), + ExaColumnInfo("c_int", classOf[java.lang.Integer], 23, 0, 10, true), ExaColumnInfo("c_long", classOf[java.lang.Long], 0, 0, 0, false), - ExaColumnInfo("c_long", classOf[java.lang.Long], 7, 3, 4, true), + ExaColumnInfo("c_long", classOf[java.lang.Long], 18, 9, 0, true), + ExaColumnInfo("c_long", classOf[java.lang.Long], 20, 3, 9, true), + ExaColumnInfo("c_decimal_int", classOf[java.math.BigDecimal], 9, 0, 0, false), + ExaColumnInfo("c_decimal_long", classOf[java.math.BigDecimal], 17, 0, 0, false), ExaColumnInfo("c_decimal", classOf[java.math.BigDecimal], 38, 10, 16, false), ExaColumnInfo("c_double", classOf[java.lang.Double], 0, 0, 0, true), ExaColumnInfo("c_string", classOf[java.lang.String], 0, 0, 0, false), @@ -47,20 +52,50 @@ class SchemaUtilSuite extends FunSuite with Matchers with MockitoSugar { schemaName, new PrimitiveType(Repetition.OPTIONAL, PrimitiveType.PrimitiveTypeName.INT32, "c_int"), Types - .primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, Repetition.OPTIONAL) + .primitive(PrimitiveTypeName.INT32, Repetition.OPTIONAL) .precision(1) .scale(0) - .length(1) + .as(OriginalType.DECIMAL) + .named("c_int"), + Types + .primitive(PrimitiveTypeName.INT64, Repetition.OPTIONAL) + .precision(10) + .scale(0) + .as(OriginalType.DECIMAL) + .named("c_int"), + Types + .primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, Repetition.OPTIONAL) + .precision(23) + .scale(0) + .length(10) .as(OriginalType.DECIMAL) .named("c_int"), new PrimitiveType(Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT64, "c_long"), + Types + .primitive(PrimitiveTypeName.INT64, Repetition.OPTIONAL) + .precision(18) + .scale(9) + .as(OriginalType.DECIMAL) + .named("c_long"), Types .primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, Repetition.OPTIONAL) - .precision(7) + .precision(20) .scale(3) - .length(4) + .length(9) .as(OriginalType.DECIMAL) .named("c_long"), + Types + .primitive(PrimitiveTypeName.INT32, Repetition.REQUIRED) + .precision(9) + .scale(0) + .as(OriginalType.DECIMAL) + .named("c_decimal_int"), + Types + .primitive(PrimitiveTypeName.INT64, Repetition.REQUIRED) + .precision(17) + .scale(0) + .as(OriginalType.DECIMAL) + .named("c_decimal_long"), Types .primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, Repetition.REQUIRED) .precision(38) From b32e100ac7212b94115f1679f817215a1110ece7 Mon Sep 17 00:00:00 2001 From: Muhammet Orazov Date: Mon, 4 Feb 2019 17:40:09 +0100 Subject: [PATCH 14/20] Use type method to obtain primitive type --- .../scala/com/exasol/cloudetl/parquet/RowWriteSupport.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/com/exasol/cloudetl/parquet/RowWriteSupport.scala b/src/main/scala/com/exasol/cloudetl/parquet/RowWriteSupport.scala index fef4ae75..648f63cf 100644 --- a/src/main/scala/com/exasol/cloudetl/parquet/RowWriteSupport.scala +++ b/src/main/scala/com/exasol/cloudetl/parquet/RowWriteSupport.scala @@ -59,7 +59,7 @@ class RowWriteSupport(schema: MessageType) extends WriteSupport[Row] { this.rootFieldWriters = schema.getFields.asScala .map { case field => - makeWriter(field.asInstanceOf[PrimitiveType]) + makeWriter(field.asPrimitiveType()) } .toArray[RowValueWriter] From 9f1d1f5014eef5c81a76fe3e647409ddfda9296c Mon Sep 17 00:00:00 2001 From: Muhammet Orazov Date: Tue, 5 Feb 2019 17:49:25 +0100 Subject: [PATCH 15/20] Add date and timestamp reader functionality Should fix #15 --- .../cloudetl/parquet/RowReadSupport.scala | 154 ++++++++++++++++-- .../cloudetl/parquet/RowWriteSupport.scala | 21 +-- .../exasol/cloudetl/util/DateTimeUtil.scala | 78 +++++++++ .../scriptclasses/ExportTableSuite.scala | 49 +++--- 4 files changed, 245 insertions(+), 57 deletions(-) create mode 100644 src/main/scala/com/exasol/cloudetl/util/DateTimeUtil.scala diff --git a/src/main/scala/com/exasol/cloudetl/parquet/RowReadSupport.scala b/src/main/scala/com/exasol/cloudetl/parquet/RowReadSupport.scala index df884275..9d4d87b4 100644 --- a/src/main/scala/com/exasol/cloudetl/parquet/RowReadSupport.scala +++ b/src/main/scala/com/exasol/cloudetl/parquet/RowReadSupport.scala @@ -1,6 +1,11 @@ package com.exasol.cloudetl.parquet +import java.math.BigInteger +import java.math.MathContext +import java.nio.ByteOrder + import com.exasol.cloudetl.data.Row +import com.exasol.cloudetl.util.DateTimeUtil import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.api.ReadSupport @@ -12,6 +17,9 @@ import org.apache.parquet.io.api.PrimitiveConverter import org.apache.parquet.io.api.RecordMaterializer import org.apache.parquet.schema.GroupType import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.OriginalType +import org.apache.parquet.schema.PrimitiveType +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.parquet.schema.Type @SuppressWarnings(Array("org.wartremover.contrib.warts.UnsafeInheritance")) @@ -61,9 +69,9 @@ class RowRootConverter(schema: GroupType) extends GroupConverter { private def createNewConverter(tpe: Type, idx: Int): Converter = { if (!tpe.isPrimitive()) { - throw new IllegalArgumentException("Currently only primitive types are supported") + throw new UnsupportedOperationException("Currently only primitive types are supported") } - new RowPrimitiveConverter(this, idx) + makeReader(tpe.asPrimitiveType(), idx) } def currentResult(): Array[Any] = @@ -77,26 +85,138 @@ class RowRootConverter(schema: GroupType) extends GroupConverter { override def end(): Unit = {} -} + private def makeReader(primitiveType: PrimitiveType, idx: Int): Converter = { + val typeName = primitiveType.getPrimitiveTypeName + val originalType = primitiveType.getOriginalType + + typeName match { + case PrimitiveTypeName.INT32 => + originalType match { + case OriginalType.DATE => new RowDateConverter(this, idx) + case OriginalType.DECIMAL => + val decimalMetadata = primitiveType.getDecimalMetadata + new RowDecimalConverter( + this, + idx, + decimalMetadata.getPrecision, + decimalMetadata.getScale + ) + case _ => new RowPrimitiveConverter(this, idx) + } + case PrimitiveTypeName.BOOLEAN => new RowPrimitiveConverter(this, idx) + case PrimitiveTypeName.DOUBLE => new RowPrimitiveConverter(this, idx) + case PrimitiveTypeName.FLOAT => new RowPrimitiveConverter(this, idx) + + case PrimitiveTypeName.BINARY => + originalType match { + case OriginalType.UTF8 => new RowStringConverter(this, idx) + case _ => new RowPrimitiveConverter(this, idx) + } + case PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY => + originalType match { + case OriginalType.DECIMAL => + val decimalMetadata = primitiveType.getDecimalMetadata + new RowDecimalConverter( + this, + idx, + decimalMetadata.getPrecision, + decimalMetadata.getScale + ) + case _ => new RowPrimitiveConverter(this, idx) + } + case PrimitiveTypeName.INT64 => + originalType match { + case OriginalType.TIMESTAMP_MILLIS => new RowTimestampConverter(this, idx) + case OriginalType.DECIMAL => + val decimalMetadata = primitiveType.getDecimalMetadata + new RowDecimalConverter( + this, + idx, + decimalMetadata.getPrecision, + decimalMetadata.getScale + ) + case _ => new RowPrimitiveConverter(this, idx) + } + + case PrimitiveTypeName.INT96 => new RowTimestampConverter(this, idx) + + case _ => + throw new UnsupportedOperationException( + s"Parquet type '$typeName' cannot be read into Exasol type." + ) + } + } + + private final class RowPrimitiveConverter(val parent: RowRootConverter, val index: Int) + extends PrimitiveConverter { + + override def addBinary(value: Binary): Unit = + parent.currentResult.update(index, value.getBytes()) + + override def addBoolean(value: Boolean): Unit = + parent.currentResult.update(index, value) + + override def addDouble(value: Double): Unit = + parent.currentResult.update(index, value) + + override def addFloat(value: Float): Unit = + parent.currentResult.update(index, value) + + override def addInt(value: Int): Unit = + parent.currentResult.update(index, value) -final class RowPrimitiveConverter(val parent: RowRootConverter, val index: Int) - extends PrimitiveConverter { + override def addLong(value: Long): Unit = + parent.currentResult.update(index, value) + } + + final class RowStringConverter(val parent: RowRootConverter, val index: Int) + extends PrimitiveConverter { + override def addBinary(value: Binary): Unit = + parent.currentResult.update(index, value.toStringUsingUTF8()) + } + + private final class RowDecimalConverter( + val parent: RowRootConverter, + val index: Int, + precision: Int, + scale: Int + ) extends PrimitiveConverter { + // Converts decimals stored as INT32 + override def addInt(value: Int): Unit = + parent.currentResult.update(index, value) + + // Converts decimals stored as INT64 + override def addLong(value: Long): Unit = + parent.currentResult.update(index, value) + + override def addBinary(value: Binary): Unit = { + val bi = new BigInteger(value.getBytes) + val bd = BigDecimal.apply(bi, scale, new MathContext(precision)) + parent.currentResult.update(index, bd) + } + } - override def addBinary(value: Binary): Unit = - parent.currentResult.update(index, value.toStringUsingUTF8()) + private final class RowTimestampConverter(val parent: RowRootConverter, val index: Int) + extends PrimitiveConverter { - override def addBoolean(value: Boolean): Unit = - parent.currentResult.update(index, value) + override def addBinary(value: Binary): Unit = { + val buf = value.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN) + val nanos = buf.getLong + val days = buf.getInt + val micros = DateTimeUtil.getMicrosFromJulianDay(days, nanos) + val ts = DateTimeUtil.getTimestampFromMicros(micros) - override def addDouble(value: Double): Unit = - parent.currentResult.update(index, value) + parent.currentResult.update(index, ts) + } + } - override def addFloat(value: Float): Unit = - parent.currentResult.update(index, value) + private final class RowDateConverter(val parent: RowRootConverter, val index: Int) + extends PrimitiveConverter { - override def addInt(value: Int): Unit = - parent.currentResult.update(index, value) + override def addInt(value: Int): Unit = { + val date = DateTimeUtil.daysToDate(value.toLong) + parent.currentResult.update(index, date) + } + } - override def addLong(value: Long): Unit = - parent.currentResult.update(index, value) } diff --git a/src/main/scala/com/exasol/cloudetl/parquet/RowWriteSupport.scala b/src/main/scala/com/exasol/cloudetl/parquet/RowWriteSupport.scala index 648f63cf..ccfa8642 100644 --- a/src/main/scala/com/exasol/cloudetl/parquet/RowWriteSupport.scala +++ b/src/main/scala/com/exasol/cloudetl/parquet/RowWriteSupport.scala @@ -6,6 +6,7 @@ import java.nio.ByteOrder import scala.collection.JavaConverters._ import com.exasol.cloudetl.data.Row +import com.exasol.cloudetl.util.DateTimeUtil import com.exasol.cloudetl.util.SchemaUtil import org.apache.hadoop.conf.Configuration @@ -154,31 +155,17 @@ class RowWriteSupport(schema: MessageType) extends WriteSupport[Row] { } private def makeDateWriter(): RowValueWriter = (row: Row, index: Int) => { - import java.time._ - import java.time.temporal.ChronoUnit - - val UnixEpoch = LocalDate.of(1970, 1, 1) // scalastyle:ignore magic.number - // Write the number of days since unix epoch as integer val date = row.values(index).asInstanceOf[java.sql.Date] - val localDate = Instant.ofEpochMilli(date.getTime).atZone(ZoneId.of("UTC")).toLocalDate - val days = ChronoUnit.DAYS.between(UnixEpoch, localDate) + val days = DateTimeUtil.daysSinceEpoch(date) recordConsumer.addInteger(days.toInt) } private def makeTimestampWriter(): RowValueWriter = (row: Row, index: Int) => { - import java.time._ - import java.time.temporal.ChronoUnit - - val JulianEpochInGregorian = - LocalDateTime.of(-4713, 11, 24, 0, 0, 0) // scalastyle:ignore magic.number - val timestamp = row.values(index).asInstanceOf[java.sql.Timestamp] - val dt = Instant.ofEpochMilli(timestamp.getTime).atZone(ZoneId.of("UTC")) - val days = ChronoUnit.DAYS.between(JulianEpochInGregorian, dt).toInt - val nanos = timestamp.getNanos + ChronoUnit.NANOS - .between(dt.toLocalDate.atStartOfDay(ZoneId.of("UTC")).toLocalTime, dt.toLocalTime) + val micros = DateTimeUtil.getMicrosFromTimestamp(timestamp) + val (days, nanos) = DateTimeUtil.getJulianDayAndNanos(micros) val buf = ByteBuffer.wrap(timestampBuffer) val _ = buf.order(ByteOrder.LITTLE_ENDIAN).putLong(nanos).putInt(days) diff --git a/src/main/scala/com/exasol/cloudetl/util/DateTimeUtil.scala b/src/main/scala/com/exasol/cloudetl/util/DateTimeUtil.scala new file mode 100644 index 00000000..5a3f23ee --- /dev/null +++ b/src/main/scala/com/exasol/cloudetl/util/DateTimeUtil.scala @@ -0,0 +1,78 @@ +package com.exasol.cloudetl.util + +import java.sql.Date +import java.sql.Timestamp +import java.time._ +import java.time.temporal.ChronoUnit + +/** + * Helper functions to convert date time values + */ +object DateTimeUtil { + // scalastyle:off magic.number + val UnixEpochDate: LocalDate = LocalDate.of(1970, 1, 1) + val UnixEpochDateTime: LocalDateTime = LocalDateTime.of(1970, 1, 1, 0, 0, 0) + // scalastyle:on magic.number + + val JULIAN_DAY_OF_EPOCH: Long = 2440588 + val SECONDS_PER_DAY: Long = 60 * 60 * 24L + val MILLIS_PER_SECOND: Long = 1000L + val MICROS_PER_MILLIS: Long = 1000L + val MICROS_PER_SECOND: Long = MICROS_PER_MILLIS * MILLIS_PER_SECOND + val MICROS_PER_DAY: Long = MICROS_PER_SECOND * SECONDS_PER_DAY + + /** Returns a [[java.sql.Timestamp]] timestamp from number of microseconds since epoch */ + @SuppressWarnings(Array("org.wartremover.warts.Var")) + def getTimestampFromMicros(us: Long): Timestamp = { + // setNanos() will overwrite the millisecond part, so the milliseconds should be cut off at + // seconds + var seconds = us / MICROS_PER_SECOND + var micros = us % MICROS_PER_SECOND + if (micros < 0) { // setNanos() can not accept negative value + micros += MICROS_PER_SECOND + seconds -= 1 + } + val ts = new Timestamp(seconds * 1000) + ts.setNanos(micros.toInt * 1000) + + ts + } + + /** Returns the number of micros since epoch from [[java.sql.Timestamp]] */ + def getMicrosFromTimestamp(ts: Timestamp): Long = + if (ts != null) { + ts.getTime() * 1000L + (ts.getNanos().toLong / 1000) % 1000L + } else { + 0L + } + + /** Returns Julian day and nanoseconds in a day from microseconds since epoch */ + @SuppressWarnings(Array("org.wartremover.contrib.warts.ExposedTuples")) + def getJulianDayAndNanos(us: Long): (Int, Long) = { + val julian_us = us + JULIAN_DAY_OF_EPOCH * MICROS_PER_DAY + val day = julian_us / MICROS_PER_DAY + val micros = julian_us % MICROS_PER_DAY + (day.toInt, micros * 1000L) + } + + /** Returns microseconds since epoch from Julian day and nanoseconds in a day */ + def getMicrosFromJulianDay(day: Int, nanos: Long): Long = { + val seconds = (day - JULIAN_DAY_OF_EPOCH).toLong * SECONDS_PER_DAY + seconds * MICROS_PER_SECOND + nanos / 1000L + } + + /** Returns the number of days since unix epoch */ + def daysSinceEpoch(date: Date): Long = { + val localDate = Instant.ofEpochMilli(date.getTime).atZone(ZoneId.systemDefault).toLocalDate + val days = ChronoUnit.DAYS.between(UnixEpochDate, localDate) + days + } + + /** Returns a [[java.sql.Date]] date given the days since epoch */ + def daysToDate(days: Long): Date = { + val date = UnixEpochDateTime.plusDays(days) + val millis = date.atZone(ZoneId.systemDefault).toInstant.toEpochMilli + new Date(millis) + } + +} diff --git a/src/test/scala/com/exasol/cloudetl/scriptclasses/ExportTableSuite.scala b/src/test/scala/com/exasol/cloudetl/scriptclasses/ExportTableSuite.scala index 19d2ced1..6a50a175 100644 --- a/src/test/scala/com/exasol/cloudetl/scriptclasses/ExportTableSuite.scala +++ b/src/test/scala/com/exasol/cloudetl/scriptclasses/ExportTableSuite.scala @@ -6,11 +6,8 @@ import java.nio.file.attribute.BasicFileAttributes import com.exasol.ExaIterator import com.exasol.ExaMetadata -import com.exasol.cloudetl.parquet.ParquetSource -import com.exasol.cloudetl.util.FsUtil -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.FileSystem +import org.mockito.ArgumentMatchers.any import org.mockito.ExtraMockito import org.mockito.Mockito._ @@ -28,18 +25,25 @@ class ExportTableSuite extends BaseSuite { "c_timestamp" ) + private val bd1 = new java.math.BigDecimal("5555555555555555555555555555555.55555") + private val bd2 = new java.math.BigDecimal("5555555555555555555555555555555.55555") + private val dt1 = new java.sql.Date(System.currentTimeMillis()) + private val dt2 = new java.sql.Date(System.currentTimeMillis()) + private val ts1 = new java.sql.Timestamp(System.currentTimeMillis()) + private val ts2 = new java.sql.Timestamp(System.currentTimeMillis()) + + val records: Seq[Seq[Object]] = Seq( + Seq(1, 3L, bd1, 3.14d, "xyz", true, dt1, ts1), + Seq(2, 4L, bd2, 0.13d, "abc", false, dt2, ts2) + ).map { seq => + seq.map(_.asInstanceOf[AnyRef]) + } + final def createMockedIter(resourceDir: String): ExaIterator = { val mockedIter = commonExaIterator(resourceDir) when(mockedIter.getString(2)).thenReturn(srcColumns.mkString(".")) when(mockedIter.next()).thenReturn(true, false) - val bd1 = new java.math.BigDecimal(1337) - val bd2 = new java.math.BigDecimal(8888) - val dt1 = new java.sql.Date(System.currentTimeMillis()) - val dt2 = new java.sql.Date(System.currentTimeMillis()) - val ts1 = new java.sql.Timestamp(System.currentTimeMillis()) - val ts2 = new java.sql.Timestamp(System.currentTimeMillis()) - when(mockedIter.getInteger(3)).thenReturn(1, 2) when(mockedIter.getLong(4)).thenReturn(3L, 4L) when(mockedIter.getBigDecimal(5)).thenReturn(bd1, bd2) @@ -58,7 +62,7 @@ class ExportTableSuite extends BaseSuite { val returns = Seq( (3, classOf[java.lang.Integer], 0L, 0L, 0L), (4, classOf[java.lang.Long], 0L, 0L, 0L), - (5, classOf[java.math.BigDecimal], 9L, 2L, 0L), + (5, classOf[java.math.BigDecimal], 36L, 5L, 0L), (6, classOf[java.lang.Double], 0L, 0L, 0L), (7, classOf[java.lang.String], 0L, 0L, 3L), (8, classOf[java.lang.Boolean], 0L, 0L, 0L), @@ -104,24 +108,23 @@ class ExportTableSuite extends BaseSuite { deleteFiles(tempDir) } - test("read exported rows from a file") { - val tempDir = Files.createTempDirectory("exportTableTest") + test("import exported rows from a file") { + val tempDir = Files.createTempDirectory("importExportTableTest") val meta = createMockedMeta() val iter = createMockedIter(tempDir.toUri.toString) ExportTable.run(meta, iter) - val conf = new Configuration() - val fs = FileSystem.get(conf) + val importIter = commonExaIterator(resourceImportBucket) + when(importIter.next()).thenReturn(false) + when(importIter.getString(2)).thenReturn(tempDir.toUri.toString) - val data = ParquetSource(FsUtil.globWithLocal(tempDir, fs), fs, conf) - val iters = data.stream + ImportFiles.run(mock[ExaMetadata], importIter) - iters.foreach { iter => - iter.foreach { row => - println(s"ROW = $row") - } - } + val totalRecords = 2 + verify(importIter, times(totalRecords)).emit(Seq(any[Object]): _*) + + // TODO: verify each emitted row deleteFiles(tempDir) } From c6013c2b70ce1ee28721b56cce6a14b621165ca5 Mon Sep 17 00:00:00 2001 From: Muhammet Orazov Date: Fri, 8 Feb 2019 11:40:49 +0100 Subject: [PATCH 16/20] [skip ci] Minor reformatting notice in readme.md --- README.md | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 8a544a17..aee22c8d 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,11 @@ [![Codecov][codecov-badge]][codecov-link] [![GitHub Latest Release][gh-release-badge]][gh-release-link] -

🛈 Please note that this is an open source project which is officially supported by Exasol. For any question, you can contact our support team.

+

+🛈 Please note that this is an open +source project which is officially supported by Exasol. For any question, you +can contact our support team. +

## Table of Contents From a8daf06a30a33a25a64b381422d2e40533736cd3 Mon Sep 17 00:00:00 2001 From: Muhammet Orazov Date: Mon, 11 Feb 2019 11:01:46 +0100 Subject: [PATCH 17/20] Do not separate decimal values into int32 or int64 values This does not make sense in this case, because both when reading or writing exasol will provide correct Java type, e.g, BigDecimal if decimal with precision and scale and regular Integer or Long if int32 or int64. --- .../cloudetl/parquet/RowWriteSupport.scala | 31 +--------- .../com/exasol/cloudetl/util/SchemaUtil.scala | 62 +++++-------------- .../cloudetl/util/SchemaUtilSuite.scala | 32 +++------- 3 files changed, 27 insertions(+), 98 deletions(-) diff --git a/src/main/scala/com/exasol/cloudetl/parquet/RowWriteSupport.scala b/src/main/scala/com/exasol/cloudetl/parquet/RowWriteSupport.scala index ccfa8642..6d3e8109 100644 --- a/src/main/scala/com/exasol/cloudetl/parquet/RowWriteSupport.scala +++ b/src/main/scala/com/exasol/cloudetl/parquet/RowWriteSupport.scala @@ -117,9 +117,6 @@ class RowWriteSupport(schema: MessageType) extends WriteSupport[Row] { originalType match { case OriginalType.DATE => makeDateWriter() - case OriginalType.DECIMAL => - val decimalMetadata = primitiveType.getDecimalMetadata - makeDecimalWriter(decimalMetadata.getPrecision, decimalMetadata.getScale) case _ => (row: Row, index: Int) => recordConsumer.addInteger(row.values(index).asInstanceOf[Integer]) @@ -188,23 +185,6 @@ class RowWriteSupport(schema: MessageType) extends WriteSupport[Row] { // The number of bytes from given the precision val numBytes = SchemaUtil.PRECISION_TO_BYTE_SIZE(precision - 1) - val int32Writer = (row: Row, index: Int) => { - val bigDecimalInt = - row - .values(index) - .asInstanceOf[java.math.BigDecimal] - .unscaledValue() - .longValueExact() - .toInt - recordConsumer.addInteger(bigDecimalInt) - } - - val int64Writer = (row: Row, index: Int) => { - val bigDecimalLong = - row.values(index).asInstanceOf[java.math.BigDecimal].unscaledValue().longValueExact() - recordConsumer.addLong(bigDecimalLong) - } - val bytesWriter = (row: Row, index: Int) => { val decimal = row.values(index).asInstanceOf[java.math.BigDecimal] val unscaled = decimal.unscaledValue() @@ -233,16 +213,7 @@ class RowWriteSupport(schema: MessageType) extends WriteSupport[Row] { recordConsumer.addBinary(Binary.fromReusedByteArray(fixedLenBytesArray, 0, numBytes)) } - if (precision <= SchemaUtil.DECIMAL_MAX_INT_DIGITS) { - // 1 <= precision <= 9, writes as INT32 - int32Writer - } else if (precision <= SchemaUtil.DECIMAL_MAX_LONG_DIGITS) { - // 10 <= precision <= 18, writes as INT64 - int64Writer - } else { - // 19 <= precision <= 38, writes as FIXED_LEN_BYTE_ARRAY - bytesWriter - } + bytesWriter } } diff --git a/src/main/scala/com/exasol/cloudetl/util/SchemaUtil.scala b/src/main/scala/com/exasol/cloudetl/util/SchemaUtil.scala index 7c489f05..83a56164 100644 --- a/src/main/scala/com/exasol/cloudetl/util/SchemaUtil.scala +++ b/src/main/scala/com/exasol/cloudetl/util/SchemaUtil.scala @@ -58,44 +58,6 @@ object SchemaUtil { val colType = colInfo.`type` val repetition = if (colInfo.isNullable) Repetition.OPTIONAL else Repetition.REQUIRED - // Given a numeric type (int, long, bigDecimal) with precision more than zero, encodes it as - // Parquet INT32, INT64 or FIXED_LEN_BYTE_ARRAY type. - // - // - for 1 <= precision <= 9, use INT32 - // - for 1 <= precision <= 18, use INT64 - // - otherwise, use FIXED_LEN_BYTE_ARRAY - def makeDecimalType(precision: Int): Type = { - require( - precision > 0, - s"""|The precision should be larger than zero for type '$colType' in order to encode - |it as numeric (int32, int64, fixed_len_array) type. - """.stripMargin - ) - if (precision <= DECIMAL_MAX_INT_DIGITS) { - Types - .primitive(PrimitiveTypeName.INT32, repetition) - .precision(precision) - .scale(colInfo.scale) - .as(OriginalType.DECIMAL) - .named(colName) - } else if (precision <= DECIMAL_MAX_LONG_DIGITS) { - Types - .primitive(PrimitiveTypeName.INT64, repetition) - .precision(precision) - .scale(colInfo.scale) - .as(OriginalType.DECIMAL) - .named(colName) - } else { - Types - .primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, repetition) - .precision(precision) - .scale(colInfo.scale) - .length(PRECISION_TO_BYTE_SIZE(precision - 1)) - .as(OriginalType.DECIMAL) - .named(colName) - } - } - import JTypes._ colType match { @@ -104,15 +66,17 @@ object SchemaUtil { Types .primitive(PrimitiveTypeName.INT32, repetition) .named(colName) - } else if (colInfo.precision <= DECIMAL_MAX_INT_DIGITS) { + } else { + require( + colInfo.precision <= DECIMAL_MAX_INT_DIGITS, + s"Got an 'Integer' type with more than '$DECIMAL_MAX_INT_DIGITS' precision." + ) Types .primitive(PrimitiveTypeName.INT32, repetition) .precision(colInfo.precision) .scale(colInfo.scale) .as(OriginalType.DECIMAL) .named(colName) - } else { - makeDecimalType(colInfo.precision) } case `jLong` => @@ -120,19 +84,27 @@ object SchemaUtil { Types .primitive(PrimitiveTypeName.INT64, repetition) .named(colName) - } else if (colInfo.precision <= DECIMAL_MAX_LONG_DIGITS) { + } else { + require( + colInfo.precision <= DECIMAL_MAX_LONG_DIGITS, + s"Got a 'Long' type with more than '$DECIMAL_MAX_LONG_DIGITS' precision." + ) Types .primitive(PrimitiveTypeName.INT64, repetition) .precision(colInfo.precision) .scale(colInfo.scale) .as(OriginalType.DECIMAL) .named(colName) - } else { - makeDecimalType(colInfo.precision) } case `jBigDecimal` => - makeDecimalType(colInfo.precision) + Types + .primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, repetition) + .precision(colInfo.precision) + .scale(colInfo.scale) + .length(PRECISION_TO_BYTE_SIZE(colInfo.precision - 1)) + .as(OriginalType.DECIMAL) + .named(colName) case `jDouble` => Types diff --git a/src/test/scala/com/exasol/cloudetl/util/SchemaUtilSuite.scala b/src/test/scala/com/exasol/cloudetl/util/SchemaUtilSuite.scala index 7de2cc41..7b6f0adf 100644 --- a/src/test/scala/com/exasol/cloudetl/util/SchemaUtilSuite.scala +++ b/src/test/scala/com/exasol/cloudetl/util/SchemaUtilSuite.scala @@ -30,11 +30,9 @@ class SchemaUtilSuite extends FunSuite with Matchers with MockitoSugar { val exaColumns = Seq( ExaColumnInfo("c_int", classOf[java.lang.Integer], 0, 0, 0, true), ExaColumnInfo("c_int", classOf[java.lang.Integer], 1, 0, 0, true), - ExaColumnInfo("c_int", classOf[java.lang.Integer], 10, 0, 0, true), - ExaColumnInfo("c_int", classOf[java.lang.Integer], 23, 0, 10, true), + ExaColumnInfo("c_int", classOf[java.lang.Integer], 9, 0, 0, true), ExaColumnInfo("c_long", classOf[java.lang.Long], 0, 0, 0, false), - ExaColumnInfo("c_long", classOf[java.lang.Long], 18, 9, 0, true), - ExaColumnInfo("c_long", classOf[java.lang.Long], 20, 3, 9, true), + ExaColumnInfo("c_long", classOf[java.lang.Long], 18, 0, 0, true), ExaColumnInfo("c_decimal_int", classOf[java.math.BigDecimal], 9, 0, 0, false), ExaColumnInfo("c_decimal_long", classOf[java.math.BigDecimal], 17, 0, 0, false), ExaColumnInfo("c_decimal", classOf[java.math.BigDecimal], 38, 10, 16, false), @@ -58,42 +56,30 @@ class SchemaUtilSuite extends FunSuite with Matchers with MockitoSugar { .as(OriginalType.DECIMAL) .named("c_int"), Types - .primitive(PrimitiveTypeName.INT64, Repetition.OPTIONAL) - .precision(10) - .scale(0) - .as(OriginalType.DECIMAL) - .named("c_int"), - Types - .primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, Repetition.OPTIONAL) - .precision(23) + .primitive(PrimitiveTypeName.INT32, Repetition.OPTIONAL) + .precision(9) .scale(0) - .length(10) .as(OriginalType.DECIMAL) .named("c_int"), new PrimitiveType(Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT64, "c_long"), Types .primitive(PrimitiveTypeName.INT64, Repetition.OPTIONAL) .precision(18) - .scale(9) - .as(OriginalType.DECIMAL) - .named("c_long"), - Types - .primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, Repetition.OPTIONAL) - .precision(20) - .scale(3) - .length(9) + .scale(0) .as(OriginalType.DECIMAL) .named("c_long"), Types - .primitive(PrimitiveTypeName.INT32, Repetition.REQUIRED) + .primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, Repetition.REQUIRED) .precision(9) .scale(0) + .length(4) .as(OriginalType.DECIMAL) .named("c_decimal_int"), Types - .primitive(PrimitiveTypeName.INT64, Repetition.REQUIRED) + .primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, Repetition.REQUIRED) .precision(17) .scale(0) + .length(8) .as(OriginalType.DECIMAL) .named("c_decimal_long"), Types From 9540696ac92c464030d8ec4fda3405f670f8588b Mon Sep 17 00:00:00 2001 From: Muhammet Orazov Date: Mon, 11 Feb 2019 11:04:08 +0100 Subject: [PATCH 18/20] ExaIterator cannot emit Scala `math.BigDecimal`. Change it into `java.math.BigDecimal`. --- .../scala/com/exasol/cloudetl/parquet/RowReadSupport.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/main/scala/com/exasol/cloudetl/parquet/RowReadSupport.scala b/src/main/scala/com/exasol/cloudetl/parquet/RowReadSupport.scala index 9d4d87b4..0d00d873 100644 --- a/src/main/scala/com/exasol/cloudetl/parquet/RowReadSupport.scala +++ b/src/main/scala/com/exasol/cloudetl/parquet/RowReadSupport.scala @@ -1,7 +1,5 @@ package com.exasol.cloudetl.parquet -import java.math.BigInteger -import java.math.MathContext import java.nio.ByteOrder import com.exasol.cloudetl.data.Row @@ -190,8 +188,8 @@ class RowRootConverter(schema: GroupType) extends GroupConverter { parent.currentResult.update(index, value) override def addBinary(value: Binary): Unit = { - val bi = new BigInteger(value.getBytes) - val bd = BigDecimal.apply(bi, scale, new MathContext(precision)) + val bi = new java.math.BigInteger(value.getBytes) + val bd = new java.math.BigDecimal(bi, scale, new java.math.MathContext(precision)) parent.currentResult.update(index, bd) } } From 7466de14b4201a1dbf7635fffc0443399582d371 Mon Sep 17 00:00:00 2001 From: Muhammet Orazov Date: Mon, 11 Feb 2019 11:05:26 +0100 Subject: [PATCH 19/20] Add fs.LocalFileSystem into configuration S3AFileSystem somehow requires local temp directory, before uploading to S3. This currently only tested for AWS S3. I will update this for GCP or Azure if they require this filesystem when performing tests for those platforms. --- src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala b/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala index 68cc77f2..92800945 100644 --- a/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala +++ b/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala @@ -36,6 +36,7 @@ final case class S3Bucket(path: String, params: Map[String, String]) extends Buc validate() val conf = new Configuration() + conf.set("fs.file.impl", classOf[org.apache.hadoop.fs.LocalFileSystem].getName) conf.set("fs.s3a.impl", classOf[org.apache.hadoop.fs.s3a.S3AFileSystem].getName) conf.set("fs.s3a.endpoint", Bucket.requiredParam(params, "S3_ENDPOINT")) conf.set("fs.s3a.access.key", Bucket.requiredParam(params, "S3_ACCESS_KEY")) From 502300020411f23372ae643b94b04d813f5910b9 Mon Sep 17 00:00:00 2001 From: Muhammet Orazov Date: Mon, 11 Feb 2019 16:32:40 +0100 Subject: [PATCH 20/20] Fix bug when converting date into days (since epoch) and back An example date was: `0001-01-01`, or in general days before 1970. After writing this `0001-01-01` as a days since epoch and when reading back I was getting `0001-12-31`. The solution was to incorporate the timezone offset millis and take the *floor* of the millis per day. --- .../exasol/cloudetl/util/DateTimeUtil.scala | 11 ++-- .../cloudetl/util/DateTimeUtilSuite.scala | 51 +++++++++++++++++++ 2 files changed, 57 insertions(+), 5 deletions(-) create mode 100644 src/test/scala/com/exasol/cloudetl/util/DateTimeUtilSuite.scala diff --git a/src/main/scala/com/exasol/cloudetl/util/DateTimeUtil.scala b/src/main/scala/com/exasol/cloudetl/util/DateTimeUtil.scala index 5a3f23ee..84df7c37 100644 --- a/src/main/scala/com/exasol/cloudetl/util/DateTimeUtil.scala +++ b/src/main/scala/com/exasol/cloudetl/util/DateTimeUtil.scala @@ -3,20 +3,20 @@ package com.exasol.cloudetl.util import java.sql.Date import java.sql.Timestamp import java.time._ -import java.time.temporal.ChronoUnit +import java.util.TimeZone /** * Helper functions to convert date time values */ object DateTimeUtil { // scalastyle:off magic.number - val UnixEpochDate: LocalDate = LocalDate.of(1970, 1, 1) val UnixEpochDateTime: LocalDateTime = LocalDateTime.of(1970, 1, 1, 0, 0, 0) // scalastyle:on magic.number val JULIAN_DAY_OF_EPOCH: Long = 2440588 val SECONDS_PER_DAY: Long = 60 * 60 * 24L val MILLIS_PER_SECOND: Long = 1000L + val MILLIS_PER_DAY: Long = SECONDS_PER_DAY * MILLIS_PER_SECOND val MICROS_PER_MILLIS: Long = 1000L val MICROS_PER_SECOND: Long = MICROS_PER_MILLIS * MILLIS_PER_SECOND val MICROS_PER_DAY: Long = MICROS_PER_SECOND * SECONDS_PER_DAY @@ -62,10 +62,11 @@ object DateTimeUtil { } /** Returns the number of days since unix epoch */ + @SuppressWarnings(Array("org.wartremover.contrib.warts.OldTime")) def daysSinceEpoch(date: Date): Long = { - val localDate = Instant.ofEpochMilli(date.getTime).atZone(ZoneId.systemDefault).toLocalDate - val days = ChronoUnit.DAYS.between(UnixEpochDate, localDate) - days + val millisUtc = date.getTime + val millis = millisUtc + (TimeZone.getTimeZone(ZoneId.systemDefault).getOffset(millisUtc)) + Math.floor(millis.toDouble / MILLIS_PER_DAY).toLong } /** Returns a [[java.sql.Date]] date given the days since epoch */ diff --git a/src/test/scala/com/exasol/cloudetl/util/DateTimeUtilSuite.scala b/src/test/scala/com/exasol/cloudetl/util/DateTimeUtilSuite.scala new file mode 100644 index 00000000..bc6e65db --- /dev/null +++ b/src/test/scala/com/exasol/cloudetl/util/DateTimeUtilSuite.scala @@ -0,0 +1,51 @@ +package com.exasol.cloudetl.util + +import java.sql.Date +import java.text.SimpleDateFormat +import java.util.Locale + +import org.scalatest.FunSuite +import org.scalatest.Matchers + +@SuppressWarnings(Array("org.wartremover.contrib.warts.OldTime")) +class DateTimeUtilSuite extends FunSuite with Matchers { + + final def daysSinceEpochToDate(dt: Date): Unit = { + val newDT = DateTimeUtil.daysToDate(DateTimeUtil.daysSinceEpoch(dt)) + assert(dt.toString === newDT.toString) + () + } + + test("from java.sql.Date to days (since epoch) and back to java.sql.Date") { + val df1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US) + val df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss z", Locale.US) + + val testDate = Seq( + new Date(100), + new Date(df1.parse("1776-07-04 10:30:00").getTime), + new Date(df2.parse("1776-07-04 18:30:00 UTC").getTime), + Date.valueOf("1912-05-05"), + Date.valueOf("1969-01-01"), + new Date(df1.parse("1969-01-01 00:00:00").getTime), + new Date(df2.parse("1969-01-01 00:00:00 UTC").getTime), + new Date(df1.parse("1969-01-01 00:00:01").getTime), + new Date(df2.parse("1969-01-01 00:00:01 UTC").getTime), + new Date(df1.parse("1969-12-31 23:59:59").getTime), + new Date(df2.parse("1969-12-31 23:59:59 UTC").getTime), + Date.valueOf("1970-01-01"), + new Date(df1.parse("1970-01-01 00:00:00").getTime), + new Date(df2.parse("1970-01-01 00:00:00 UTC").getTime), + new Date(df1.parse("1970-01-01 00:00:01").getTime), + new Date(df2.parse("1970-01-01 00:00:01 UTC").getTime), + new Date(df1.parse("1989-11-09 11:59:59").getTime), + new Date(df2.parse("1989-11-09 19:59:59 UTC").getTime), + Date.valueOf("2019-02-10") + ) + + testDate.foreach { case dt => daysSinceEpochToDate(dt) } + } + + test("correctly converts date `0001-01-01` to days and back to date") { + daysSinceEpochToDate(Date.valueOf("0001-01-01")) + } +}