diff --git a/README.md b/README.md index 34039990..aee22c8d 100644 --- a/README.md +++ b/README.md @@ -2,8 +2,13 @@ [![Build Status][travis-badge]][travis-link] [![Codecov][codecov-badge]][codecov-link] +[![GitHub Latest Release][gh-release-badge]][gh-release-link] -

🛈 Please note that this is an open source project which is officially supported by Exasol. For any question, you can contact our support team.

+

+🛈 Please note that this is an open +source project which is officially supported by Exasol. For any question, you +can contact our support team. +

## Table of Contents @@ -52,6 +57,8 @@ Please change required parameters. CREATE SCHEMA ETL; OPEN SCHEMA ETL; +-- Import related scripts + CREATE OR REPLACE JAVA SET SCRIPT IMPORT_PATH(...) EMITS (...) AS %scriptclass com.exasol.cloudetl.scriptclasses.ImportPath; %jar /buckets/bfsdefault/bucket1/cloud-storage-etl-udfs-{VERSION}.jar; @@ -68,6 +75,18 @@ EMITS (filename VARCHAR(200), partition_index VARCHAR(100)) AS %scriptclass com.exasol.cloudetl.scriptclasses.ImportMetadata; %jar /buckets/bfsdefault/bucket1/cloud-storage-etl-udfs-{VERSION}.jar; / + +-- Export related scripts + +CREATE OR REPLACE JAVA SET SCRIPT EXPORT_PATH(...) EMITS (...) AS +%scriptclass com.exasol.cloudetl.scriptclasses.ExportPath; +%jar /buckets/bfsdefault/bucket1/cloud-storage-etl-udfs-{VERSION}.jar; +/ + +CREATE OR REPLACE JAVA SET SCRIPT EXPORT_TABLE(...) EMITS (ROWS_AFFECTED INT) AS +%scriptclass com.exasol.cloudetl.scriptclasses.ExportTable; +%jar /buckets/bfsdefault/bucket1/cloud-storage-etl-udfs-{VERSION}.jar; +/ ``` ### Import data from cloud storages @@ -77,8 +96,8 @@ Please follow steps below in order to import from cloud strorages. #### Create an Exasol schema and table ```sql -CREATE SCHEMA TEST; -OPEN SCHEMA TEST; +CREATE SCHEMA RETAIL; +OPEN SCHEMA RETAIL; DROP TABLE IF EXISTS SALES_POSITIONS; @@ -151,6 +170,20 @@ FROM SCRIPT ETL.IMPORT_PATH WITH SELECT * FROM SALES_POSITIONS LIMIT 10; ``` +#### Export to AWS S3 + +```sql +EXPORT SALES_POSITIONS +INTO SCRIPT ETL.EXPORT_PATH WITH + BUCKET_PATH = 's3a://exa-mo-frankfurt/export/retail/sales_positions/' + S3_ACCESS_KEY = 'MY_AWS_ACCESS_KEY' + S3_SECRET_KEY = 'MY_AWS_SECRET_KEY' + S3_ENDPOINT = 's3.MY_REGION.amazonaws.com' + PARALLELISM = 'nproc()'; + +-- MY_REGION is one of AWS regions, for example, eu-central-1 +``` + ## Building from Source Clone the repository, @@ -174,6 +207,8 @@ The packaged jar should be located at [travis-link]: https://travis-ci.org/exasol/cloud-storage-etl-udfs [codecov-badge]: https://codecov.io/gh/exasol/cloud-storage-etl-udfs/branch/master/graph/badge.svg [codecov-link]: https://codecov.io/gh/exasol/cloud-storage-etl-udfs +[gh-release-badge]: https://img.shields.io/github/release/exasol/cloud-storage-etl-udfs.svg +[gh-release-link]: https://github.com/exasol/cloud-storage-etl-udfs/releases/latest [exasol]: https://www.exasol.com/en/ [s3]: https://aws.amazon.com/s3/ [gcs]: https://cloud.google.com/storage/ diff --git a/project/Compilation.scala b/project/Compilation.scala index ab971072..53d8909e 100644 --- a/project/Compilation.scala +++ b/project/Compilation.scala @@ -124,6 +124,7 @@ object Compilation { ) val WartremoverTestFlags: Seq[Wart] = ExtraWartremoverFlags ++ Warts.allBut( + Wart.Any, Wart.NonUnitStatements, Wart.Null ) diff --git a/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala b/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala index 68cc77f2..92800945 100644 --- a/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala +++ b/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala @@ -36,6 +36,7 @@ final case class S3Bucket(path: String, params: Map[String, String]) extends Buc validate() val conf = new Configuration() + conf.set("fs.file.impl", classOf[org.apache.hadoop.fs.LocalFileSystem].getName) conf.set("fs.s3a.impl", classOf[org.apache.hadoop.fs.s3a.S3AFileSystem].getName) conf.set("fs.s3a.endpoint", Bucket.requiredParam(params, "S3_ENDPOINT")) conf.set("fs.s3a.access.key", Bucket.requiredParam(params, "S3_ACCESS_KEY")) diff --git a/src/main/scala/com/exasol/cloudetl/data/ExaColumnInfo.scala b/src/main/scala/com/exasol/cloudetl/data/ExaColumnInfo.scala new file mode 100644 index 00000000..f0ea6f0c --- /dev/null +++ b/src/main/scala/com/exasol/cloudetl/data/ExaColumnInfo.scala @@ -0,0 +1,12 @@ +package com.exasol.cloudetl.data + +/** An Exasol table column information */ +@SuppressWarnings(Array("org.wartremover.warts.DefaultArguments")) +final case class ExaColumnInfo( + name: String, + `type`: Class[_], + precision: Int = 0, + scale: Int = 0, + length: Int = 0, + isNullable: Boolean = true +) diff --git a/src/main/scala/com/exasol/cloudetl/data/Row.scala b/src/main/scala/com/exasol/cloudetl/data/Row.scala new file mode 100644 index 00000000..aafe5ec1 --- /dev/null +++ b/src/main/scala/com/exasol/cloudetl/data/Row.scala @@ -0,0 +1,3 @@ +package com.exasol.cloudetl.data + +final case class Row(val values: Seq[Any]) diff --git a/src/main/scala/com/exasol/cloudetl/parquet/ParquetRowWriter.scala b/src/main/scala/com/exasol/cloudetl/parquet/ParquetRowWriter.scala new file mode 100644 index 00000000..a07e72e4 --- /dev/null +++ b/src/main/scala/com/exasol/cloudetl/parquet/ParquetRowWriter.scala @@ -0,0 +1,41 @@ +package com.exasol.cloudetl.parquet + +import com.exasol.cloudetl.data.Row + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.parquet.column.ParquetProperties +import org.apache.parquet.hadoop.ParquetFileWriter +import org.apache.parquet.hadoop.ParquetWriter +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.schema.MessageType + +object ParquetRowWriter { + + private[this] class Builder(path: Path, messageType: MessageType) + extends ParquetWriter.Builder[Row, Builder](path) { + + override def getWriteSupport(conf: Configuration): WriteSupport[Row] = + new RowWriteSupport(messageType) + + override def self(): Builder = this + } + + def apply( + path: Path, + conf: Configuration, + messageType: MessageType, + options: ParquetWriteOptions + ): ParquetWriter[Row] = + new Builder(path, messageType) + .withRowGroupSize(options.blockSize) + .withPageSize(options.pageSize) + .withCompressionCodec(options.compressionCodec) + .withDictionaryEncoding(options.enableDictionaryEncoding) + .withValidation(options.enableValidation) + .withWriteMode(ParquetFileWriter.Mode.CREATE) + .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0) + .withConf(conf) + .build() + +} diff --git a/src/main/scala/com/exasol/cloudetl/source/ParquetSource.scala b/src/main/scala/com/exasol/cloudetl/parquet/ParquetSource.scala similarity index 94% rename from src/main/scala/com/exasol/cloudetl/source/ParquetSource.scala rename to src/main/scala/com/exasol/cloudetl/parquet/ParquetSource.scala index ee3083d8..49f6a83d 100644 --- a/src/main/scala/com/exasol/cloudetl/source/ParquetSource.scala +++ b/src/main/scala/com/exasol/cloudetl/parquet/ParquetSource.scala @@ -1,10 +1,9 @@ -package com.exasol.cloudetl.source +package com.exasol.cloudetl.parquet import scala.collection.JavaConverters._ import scala.language.reflectiveCalls -import com.exasol.cloudetl.row.Row -import com.exasol.cloudetl.row.RowReadSupport +import com.exasol.cloudetl.data.Row import com.exasol.cloudetl.util.FsUtil import org.apache.hadoop.conf.Configuration diff --git a/src/main/scala/com/exasol/cloudetl/parquet/ParquetWriteOptions.scala b/src/main/scala/com/exasol/cloudetl/parquet/ParquetWriteOptions.scala new file mode 100644 index 00000000..9ad6db03 --- /dev/null +++ b/src/main/scala/com/exasol/cloudetl/parquet/ParquetWriteOptions.scala @@ -0,0 +1,35 @@ +package com.exasol.cloudetl.parquet + +import org.apache.parquet.hadoop.ParquetWriter +import org.apache.parquet.hadoop.metadata.CompressionCodecName + +final case class ParquetWriteOptions( + blockSize: Int, + pageSize: Int, + compressionCodec: CompressionCodecName, + enableDictionaryEncoding: Boolean, + enableValidation: Boolean +) + +object ParquetWriteOptions { + + @SuppressWarnings( + Array("org.wartremover.warts.Overloading", "org.danielnixon.extrawarts.StringOpsPartial") + ) + def apply(params: Map[String, String]): ParquetWriteOptions = { + val compressionCodec = params.getOrElse("PARQUET_COMPRESSION_CODEC", "").toUpperCase() match { + case "SNAPPY" => CompressionCodecName.SNAPPY + case "GZIP" => CompressionCodecName.GZIP + case "LZO" => CompressionCodecName.LZO + case _ => CompressionCodecName.UNCOMPRESSED + } + val blockSize = + params.get("PARQUET_BLOCK_SIZE").fold(ParquetWriter.DEFAULT_BLOCK_SIZE)(_.toInt) + val pageSize = params.get("PARQUET_PAGE_SIZE").fold(ParquetWriter.DEFAULT_PAGE_SIZE)(_.toInt) + val dictionary = params.get("PARQUET_DICTIONARY_ENCODING").fold(true)(_.toBoolean) + val validation = params.get("PARQUET_VALIDAIONT").fold(true)(_.toBoolean) + + ParquetWriteOptions(blockSize, pageSize, compressionCodec, dictionary, validation) + } + +} diff --git a/src/main/scala/com/exasol/cloudetl/parquet/RowReadSupport.scala b/src/main/scala/com/exasol/cloudetl/parquet/RowReadSupport.scala new file mode 100644 index 00000000..0d00d873 --- /dev/null +++ b/src/main/scala/com/exasol/cloudetl/parquet/RowReadSupport.scala @@ -0,0 +1,220 @@ +package com.exasol.cloudetl.parquet + +import java.nio.ByteOrder + +import com.exasol.cloudetl.data.Row +import com.exasol.cloudetl.util.DateTimeUtil + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.ReadSupport +import org.apache.parquet.hadoop.api.ReadSupport.ReadContext +import org.apache.parquet.io.api.Binary +import org.apache.parquet.io.api.Converter +import org.apache.parquet.io.api.GroupConverter +import org.apache.parquet.io.api.PrimitiveConverter +import org.apache.parquet.io.api.RecordMaterializer +import org.apache.parquet.schema.GroupType +import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.OriginalType +import org.apache.parquet.schema.PrimitiveType +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName +import org.apache.parquet.schema.Type + +@SuppressWarnings(Array("org.wartremover.contrib.warts.UnsafeInheritance")) +class RowReadSupport extends ReadSupport[Row] { + + override def prepareForRead( + conf: Configuration, + metadata: java.util.Map[String, String], + messageType: MessageType, + readContext: ReadContext + ): RecordMaterializer[Row] = + new RowRecordMaterializer(messageType, readContext) + + override def init( + conf: Configuration, + metadata: java.util.Map[String, String], + messageType: MessageType + ): ReadSupport.ReadContext = { + val projection = conf.get(ReadSupport.PARQUET_READ_SCHEMA) + val requestedSchema = ReadSupport.getSchemaForRead(messageType, projection) + new ReadSupport.ReadContext(requestedSchema) + } +} + +@SuppressWarnings(Array("org.wartremover.contrib.warts.UnsafeInheritance")) +class RowRecordMaterializer(messageType: MessageType, readContext: ReadContext) + extends RecordMaterializer[Row] { + + override val getRootConverter: RowRootConverter = new RowRootConverter(messageType) + override def skipCurrentRecord(): Unit = getRootConverter.start() + override def getCurrentRecord: Row = Row(getRootConverter.currentResult.toSeq) +} + +@SuppressWarnings( + Array("org.wartremover.warts.Var", "org.wartremover.contrib.warts.UnsafeInheritance") +) +class RowRootConverter(schema: GroupType) extends GroupConverter { + private val size = schema.getFieldCount + private var values: Array[Any] = Array.ofDim[Any](size) + private final val converters: Array[Converter] = { + val arr = Array.ofDim[Converter](size) + for { i <- 0 until size } { + arr(i) = createNewConverter(schema.getType(i), i) + } + arr + } + + private def createNewConverter(tpe: Type, idx: Int): Converter = { + if (!tpe.isPrimitive()) { + throw new UnsupportedOperationException("Currently only primitive types are supported") + } + makeReader(tpe.asPrimitiveType(), idx) + } + + def currentResult(): Array[Any] = + values + + override def getConverter(idx: Int): Converter = + converters(idx) + + override def start(): Unit = + values = Array.ofDim(converters.size) + + override def end(): Unit = {} + + private def makeReader(primitiveType: PrimitiveType, idx: Int): Converter = { + val typeName = primitiveType.getPrimitiveTypeName + val originalType = primitiveType.getOriginalType + + typeName match { + case PrimitiveTypeName.INT32 => + originalType match { + case OriginalType.DATE => new RowDateConverter(this, idx) + case OriginalType.DECIMAL => + val decimalMetadata = primitiveType.getDecimalMetadata + new RowDecimalConverter( + this, + idx, + decimalMetadata.getPrecision, + decimalMetadata.getScale + ) + case _ => new RowPrimitiveConverter(this, idx) + } + case PrimitiveTypeName.BOOLEAN => new RowPrimitiveConverter(this, idx) + case PrimitiveTypeName.DOUBLE => new RowPrimitiveConverter(this, idx) + case PrimitiveTypeName.FLOAT => new RowPrimitiveConverter(this, idx) + + case PrimitiveTypeName.BINARY => + originalType match { + case OriginalType.UTF8 => new RowStringConverter(this, idx) + case _ => new RowPrimitiveConverter(this, idx) + } + case PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY => + originalType match { + case OriginalType.DECIMAL => + val decimalMetadata = primitiveType.getDecimalMetadata + new RowDecimalConverter( + this, + idx, + decimalMetadata.getPrecision, + decimalMetadata.getScale + ) + case _ => new RowPrimitiveConverter(this, idx) + } + case PrimitiveTypeName.INT64 => + originalType match { + case OriginalType.TIMESTAMP_MILLIS => new RowTimestampConverter(this, idx) + case OriginalType.DECIMAL => + val decimalMetadata = primitiveType.getDecimalMetadata + new RowDecimalConverter( + this, + idx, + decimalMetadata.getPrecision, + decimalMetadata.getScale + ) + case _ => new RowPrimitiveConverter(this, idx) + } + + case PrimitiveTypeName.INT96 => new RowTimestampConverter(this, idx) + + case _ => + throw new UnsupportedOperationException( + s"Parquet type '$typeName' cannot be read into Exasol type." + ) + } + } + + private final class RowPrimitiveConverter(val parent: RowRootConverter, val index: Int) + extends PrimitiveConverter { + + override def addBinary(value: Binary): Unit = + parent.currentResult.update(index, value.getBytes()) + + override def addBoolean(value: Boolean): Unit = + parent.currentResult.update(index, value) + + override def addDouble(value: Double): Unit = + parent.currentResult.update(index, value) + + override def addFloat(value: Float): Unit = + parent.currentResult.update(index, value) + + override def addInt(value: Int): Unit = + parent.currentResult.update(index, value) + + override def addLong(value: Long): Unit = + parent.currentResult.update(index, value) + } + + final class RowStringConverter(val parent: RowRootConverter, val index: Int) + extends PrimitiveConverter { + override def addBinary(value: Binary): Unit = + parent.currentResult.update(index, value.toStringUsingUTF8()) + } + + private final class RowDecimalConverter( + val parent: RowRootConverter, + val index: Int, + precision: Int, + scale: Int + ) extends PrimitiveConverter { + // Converts decimals stored as INT32 + override def addInt(value: Int): Unit = + parent.currentResult.update(index, value) + + // Converts decimals stored as INT64 + override def addLong(value: Long): Unit = + parent.currentResult.update(index, value) + + override def addBinary(value: Binary): Unit = { + val bi = new java.math.BigInteger(value.getBytes) + val bd = new java.math.BigDecimal(bi, scale, new java.math.MathContext(precision)) + parent.currentResult.update(index, bd) + } + } + + private final class RowTimestampConverter(val parent: RowRootConverter, val index: Int) + extends PrimitiveConverter { + + override def addBinary(value: Binary): Unit = { + val buf = value.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN) + val nanos = buf.getLong + val days = buf.getInt + val micros = DateTimeUtil.getMicrosFromJulianDay(days, nanos) + val ts = DateTimeUtil.getTimestampFromMicros(micros) + + parent.currentResult.update(index, ts) + } + } + + private final class RowDateConverter(val parent: RowRootConverter, val index: Int) + extends PrimitiveConverter { + + override def addInt(value: Int): Unit = { + val date = DateTimeUtil.daysToDate(value.toLong) + parent.currentResult.update(index, date) + } + } + +} diff --git a/src/main/scala/com/exasol/cloudetl/parquet/RowWriteSupport.scala b/src/main/scala/com/exasol/cloudetl/parquet/RowWriteSupport.scala new file mode 100644 index 00000000..6d3e8109 --- /dev/null +++ b/src/main/scala/com/exasol/cloudetl/parquet/RowWriteSupport.scala @@ -0,0 +1,219 @@ +package com.exasol.cloudetl.parquet + +import java.nio.ByteBuffer +import java.nio.ByteOrder + +import scala.collection.JavaConverters._ + +import com.exasol.cloudetl.data.Row +import com.exasol.cloudetl.util.DateTimeUtil +import com.exasol.cloudetl.util.SchemaUtil + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.hadoop.api.WriteSupport.FinalizedWriteContext +import org.apache.parquet.io.api.Binary +import org.apache.parquet.io.api.RecordConsumer +import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.OriginalType +import org.apache.parquet.schema.PrimitiveType +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName + +/** + * A Parquet [[org.apache.parquet.hadoop.api.WriteSupport]] implementation that writes + * [[com.exasol.cloudetl.data.Row]] as a Parquet data. + * + * This is mostly adapted from Spark codebase: + * - org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport + * + */ +@SuppressWarnings( + Array( + "org.wartremover.warts.AsInstanceOf", + "org.wartremover.warts.Null", + "org.wartremover.warts.Var" + ) +) +class RowWriteSupport(schema: MessageType) extends WriteSupport[Row] { + + // The number bytes required for timestamp buffer in Parquet + private final val TIMESTAMP_MAX_BYTE_SIZE: Int = 12 + + // This is a type that is responsible for writing a value in Row values index to the + // RecordConsumer + private type RowValueWriter = (Row, Int) => Unit + + // A list of `RowValueWriter`-s for each field type of Parquet `schema` + private var rootFieldWriters: Array[RowValueWriter] = _ + + // A Parquet RecordConsumer that all values of a Row will be written + private var recordConsumer: RecordConsumer = _ + + // Reusable byte array used to write timestamps as Parquet INT96 values + private val timestampBuffer = new Array[Byte](TIMESTAMP_MAX_BYTE_SIZE) + + // Reusable byte array used to write decimal values as Parquet FIXED_LEN_BYTE_ARRAY values + private val decimalBuffer = + new Array[Byte](SchemaUtil.PRECISION_TO_BYTE_SIZE(SchemaUtil.DECIMAL_MAX_PRECISION - 1)) + + final override def init(configuration: Configuration): WriteSupport.WriteContext = { + this.rootFieldWriters = schema.getFields.asScala + .map { + case field => + makeWriter(field.asPrimitiveType()) + } + .toArray[RowValueWriter] + + new WriteSupport.WriteContext(schema, new java.util.HashMap()) + } + + final override def prepareForWrite(record: RecordConsumer): Unit = + this.recordConsumer = record + + final override def write(row: Row): Unit = + consumeMessage { + writeFields(row, schema, rootFieldWriters) + } + + final override def finalizeWrite(): FinalizedWriteContext = + new FinalizedWriteContext(new java.util.HashMap()) + + private def writeFields(row: Row, schema: MessageType, writers: Array[RowValueWriter]): Unit = { + var idx = 0 + while (idx < schema.getFieldCount) { + val fieldType = schema.getType(idx) + val fieldName = fieldType.getName() + if (row.values(idx) != null) { + consumeField(fieldName, idx) { + writers(idx).apply(row, idx) + } + } + idx += 1 + } + } + + private def consumeMessage(fn: => Unit): Unit = { + recordConsumer.startMessage() + fn + recordConsumer.endMessage() + } + + private def consumeField(field: String, index: Int)(fn: => Unit): Unit = { + recordConsumer.startField(field, index) + fn + recordConsumer.endField(field, index) + } + + private def makeWriter(primitiveType: PrimitiveType): RowValueWriter = { + val typeName = primitiveType.getPrimitiveTypeName + val originalType = primitiveType.getOriginalType + + typeName match { + case PrimitiveTypeName.BOOLEAN => + (row: Row, index: Int) => + recordConsumer.addBoolean(row.values(index).asInstanceOf[Boolean]) + + case PrimitiveTypeName.INT32 => + originalType match { + case OriginalType.DATE => + makeDateWriter() + case _ => + (row: Row, index: Int) => + recordConsumer.addInteger(row.values(index).asInstanceOf[Integer]) + } + + case PrimitiveTypeName.INT64 => + (row: Row, index: Int) => + recordConsumer.addLong(row.values(index).asInstanceOf[Long]) + + case PrimitiveTypeName.FLOAT => + (row: Row, index: Int) => + recordConsumer.addFloat(row.values(index).asInstanceOf[Double].floatValue) + + case PrimitiveTypeName.DOUBLE => + (row: Row, index: Int) => + recordConsumer.addDouble(row.values(index).asInstanceOf[Double]) + + case PrimitiveTypeName.BINARY => + (row: Row, index: Int) => + recordConsumer.addBinary( + Binary.fromReusedByteArray(row.values(index).asInstanceOf[String].getBytes) + ) + + case PrimitiveTypeName.INT96 => + makeTimestampWriter() + + case PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY if originalType == OriginalType.DECIMAL => + val decimalMetadata = primitiveType.getDecimalMetadata + makeDecimalWriter(decimalMetadata.getPrecision, decimalMetadata.getScale) + + case _ => throw new UnsupportedOperationException(s"Unsupported parquet type '$typeName'.") + } + } + + private def makeDateWriter(): RowValueWriter = (row: Row, index: Int) => { + // Write the number of days since unix epoch as integer + val date = row.values(index).asInstanceOf[java.sql.Date] + val days = DateTimeUtil.daysSinceEpoch(date) + + recordConsumer.addInteger(days.toInt) + } + + private def makeTimestampWriter(): RowValueWriter = (row: Row, index: Int) => { + val timestamp = row.values(index).asInstanceOf[java.sql.Timestamp] + val micros = DateTimeUtil.getMicrosFromTimestamp(timestamp) + val (days, nanos) = DateTimeUtil.getJulianDayAndNanos(micros) + + val buf = ByteBuffer.wrap(timestampBuffer) + val _ = buf.order(ByteOrder.LITTLE_ENDIAN).putLong(nanos).putInt(days) + + recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer)) + } + + private def makeDecimalWriter(precision: Int, scale: Int): RowValueWriter = { + require( + precision >= 1, + s"Decimal precision $precision should not be less than minimum precision 1" + ) + require( + precision <= SchemaUtil.DECIMAL_MAX_PRECISION, + s"""|Decimal precision $precision should not exceed + |max precision ${SchemaUtil.DECIMAL_MAX_PRECISION} + """.stripMargin + ) + + // The number of bytes from given the precision + val numBytes = SchemaUtil.PRECISION_TO_BYTE_SIZE(precision - 1) + + val bytesWriter = (row: Row, index: Int) => { + val decimal = row.values(index).asInstanceOf[java.math.BigDecimal] + val unscaled = decimal.unscaledValue() + val bytes = unscaled.toByteArray + val fixedLenBytesArray = + if (bytes.length == numBytes) { + // If the length of the underlying byte array of the unscaled `BigDecimal` happens to be + // `numBytes`, just reuse it, so that we don't bother copying it to `decimalBuffer`. + bytes + } else if (bytes.length < numBytes) { + // Otherwise, the length must be less than `numBytes`. In this case we copy contents of + // the underlying bytes with padding sign bytes to `decimalBuffer` to form the result + // fixed-length byte array. + + // For negatives all high bits need to be 1 hence -1 used + val signByte = if (unscaled.signum < 0) -1: Byte else 0: Byte + java.util.Arrays.fill(decimalBuffer, 0, numBytes - bytes.length, signByte) + System.arraycopy(bytes, 0, decimalBuffer, numBytes - bytes.length, bytes.length) + decimalBuffer + } else { + throw new IllegalStateException( + s"The precision $precision is too small for decimal value." + ) + } + + recordConsumer.addBinary(Binary.fromReusedByteArray(fixedLenBytesArray, 0, numBytes)) + } + + bytesWriter + } + +} diff --git a/src/main/scala/com/exasol/cloudetl/row/Row.scala b/src/main/scala/com/exasol/cloudetl/row/Row.scala deleted file mode 100644 index a7ec53be..00000000 --- a/src/main/scala/com/exasol/cloudetl/row/Row.scala +++ /dev/null @@ -1,102 +0,0 @@ -package com.exasol.cloudetl.row - -import org.apache.hadoop.conf.Configuration -import org.apache.parquet.hadoop.api.ReadSupport -import org.apache.parquet.hadoop.api.ReadSupport.ReadContext -import org.apache.parquet.io.api.Binary -import org.apache.parquet.io.api.Converter -import org.apache.parquet.io.api.GroupConverter -import org.apache.parquet.io.api.PrimitiveConverter -import org.apache.parquet.io.api.RecordMaterializer -import org.apache.parquet.schema.GroupType -import org.apache.parquet.schema.MessageType -import org.apache.parquet.schema.Type - -final case class Row(val values: Seq[Any]) - -@SuppressWarnings(Array("org.wartremover.contrib.warts.UnsafeInheritance")) -class RowReadSupport extends ReadSupport[Row] { - - override def prepareForRead( - conf: Configuration, - metadata: java.util.Map[String, String], - messageType: MessageType, - readContext: ReadContext - ): RecordMaterializer[Row] = - new RowRecordMaterializer(messageType, readContext) - - override def init( - conf: Configuration, - metadata: java.util.Map[String, String], - messageType: MessageType - ): ReadSupport.ReadContext = { - val projection = conf.get(ReadSupport.PARQUET_READ_SCHEMA) - val requestedSchema = ReadSupport.getSchemaForRead(messageType, projection) - new ReadSupport.ReadContext(requestedSchema) - } -} - -@SuppressWarnings(Array("org.wartremover.contrib.warts.UnsafeInheritance")) -class RowRecordMaterializer(messageType: MessageType, readContext: ReadContext) - extends RecordMaterializer[Row] { - - override val getRootConverter: RowRootConverter = new RowRootConverter(messageType) - override def skipCurrentRecord(): Unit = getRootConverter.start() - override def getCurrentRecord: Row = Row(getRootConverter.currentResult.toSeq) -} - -@SuppressWarnings( - Array("org.wartremover.warts.Var", "org.wartremover.contrib.warts.UnsafeInheritance") -) -class RowRootConverter(schema: GroupType) extends GroupConverter { - private val size = schema.getFieldCount - private var values: Array[Any] = Array.ofDim[Any](size) - private final val converters: Array[Converter] = { - val arr = Array.ofDim[Converter](size) - for { i <- 0 until size } { - arr(i) = createNewConverter(schema.getType(i), i) - } - arr - } - - private def createNewConverter(tpe: Type, idx: Int): Converter = { - if (!tpe.isPrimitive()) { - throw new IllegalArgumentException("Currently only primitive types are supported") - } - new RowPrimitiveConverter(this, idx) - } - - def currentResult(): Array[Any] = - values - - override def getConverter(idx: Int): Converter = - converters(idx) - - override def start(): Unit = - values = Array.ofDim(converters.size) - - override def end(): Unit = {} - -} - -final class RowPrimitiveConverter(val parent: RowRootConverter, val index: Int) - extends PrimitiveConverter { - - override def addBinary(value: Binary): Unit = - parent.currentResult.update(index, value.toStringUsingUTF8()) - - override def addBoolean(value: Boolean): Unit = - parent.currentResult.update(index, value) - - override def addDouble(value: Double): Unit = - parent.currentResult.update(index, value) - - override def addFloat(value: Float): Unit = - parent.currentResult.update(index, value) - - override def addInt(value: Int): Unit = - parent.currentResult.update(index, value) - - override def addLong(value: Long): Unit = - parent.currentResult.update(index, value) -} diff --git a/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportPath.scala b/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportPath.scala new file mode 100644 index 00000000..ad004c80 --- /dev/null +++ b/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportPath.scala @@ -0,0 +1,53 @@ +package com.exasol.cloudetl.scriptclasses + +import scala.collection.JavaConverters._ + +import com.exasol.ExaExportSpecification +import com.exasol.ExaMetadata +import com.exasol.cloudetl.bucket._ + +object ExportPath { + + def generateSqlForExportSpec(exaMeta: ExaMetadata, exaSpec: ExaExportSpecification): String = { + val params = exaSpec.getParameters.asScala.toMap + val bucket = Bucket(params) + + bucket.validate() + + val bucketPath = bucket.bucketPath + val parallelism = Bucket.optionalParam(params, "PARALLELISM", "nproc()") + val rest = Bucket.mapToStr(params) + + val scriptSchema = exaMeta.getScriptSchema + + val srcColumns = getSourceColumns(exaSpec) + val srcColumnsParam = srcColumns.mkString(".") + + s"""SELECT + | $scriptSchema.EXPORT_TABLE( + | '$bucketPath', '$rest', '$srcColumnsParam', ${srcColumns.mkString(", ")} + |) + |FROM + | DUAL + |GROUP BY + | $parallelism; + |""".stripMargin + } + + private[this] def getSourceColumns(spec: ExaExportSpecification): Seq[String] = + spec.getSourceColumnNames.asScala + .map { + case value => + // Remove quotes if present + getColumnName(value).replaceAll("\"", "") + } + + /** Given a table name dot column name syntax (myTable.colInt), return the column name. */ + private[this] def getColumnName(str: String): String = str.split("\\.") match { + case Array(colName) => colName + case Array(tblName @ _, colName) => colName + case _ => + throw new RuntimeException(s"Could not parse the column name from '$str'!") + } + +} diff --git a/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportTable.scala b/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportTable.scala new file mode 100644 index 00000000..9bf9687b --- /dev/null +++ b/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportTable.scala @@ -0,0 +1,98 @@ +package com.exasol.cloudetl.scriptclasses + +import java.util.UUID + +import scala.collection.mutable.ListBuffer + +import com.exasol.ExaIterator +import com.exasol.ExaMetadata +import com.exasol.cloudetl.bucket.Bucket +import com.exasol.cloudetl.data.ExaColumnInfo +import com.exasol.cloudetl.data.Row +import com.exasol.cloudetl.parquet.ParquetRowWriter +import com.exasol.cloudetl.parquet.ParquetWriteOptions +import com.exasol.cloudetl.util.SchemaUtil + +import com.typesafe.scalalogging.LazyLogging +import org.apache.hadoop.fs.Path + +object ExportTable extends LazyLogging { + + def run(meta: ExaMetadata, iter: ExaIterator): Unit = { + val bucketPath = iter.getString(0) + val params = Bucket.strToMap(iter.getString(1)) + val bucket = Bucket(params) + + val srcColumnNames = iter.getString(2).split("\\.") + val firstColumnIdx = 3 + val columns = getColumns(meta, srcColumnNames, firstColumnIdx) + + val parquetFilename = generateParquetFilename(meta) + val path = new Path(bucketPath, parquetFilename) + val messageType = SchemaUtil.createParquetMessageType(columns, "exasol_export_schema") + val options = ParquetWriteOptions(params) + val writer = ParquetRowWriter(path, bucket.createConfiguration(), messageType, options) + + logger.info(s"Starting export from node = '${meta.getNodeId}' and vm = '${meta.getVmId}'") + + do { + val row = getRow(iter, firstColumnIdx, columns) + logger.debug(s"Writing row '$row'") + writer.write(row) + } while (iter.next()) + + writer.close() + logger.info(s"Finished exporting from node = '${meta.getNodeId}' and vm = '${meta.getVmId}'") + } + + private[this] def generateParquetFilename(meta: ExaMetadata): String = { + val nodeId = meta.getNodeId + val vmId = meta.getVmId + val uuidStr = UUID.randomUUID.toString.replaceAll("-", "") + s"exa_export_${nodeId}_${vmId}_$uuidStr.parquet" + } + + private[this] def getRow(iter: ExaIterator, startIdx: Int, columns: Seq[ExaColumnInfo]): Row = { + val vals = columns.zipWithIndex.map { + case (col, idx) => + SchemaUtil.exaColumnToValue(iter, startIdx + idx, col) + } + Row(values = vals) + } + + /** + * Creates a sequence of [[ExaColumnInfo]] columns using an Exasol [[ExaMetadata]] input column + * methods. + * + * Set the name of the column using `srcColumnNames` parameter. Additionally, set the precision, + * scale and length using corresponding functions on Exasol metadata for input columns. + * + * @param meta An Exasol [[ExaMetadata]] metadata + * @param srcColumnNames A sequence of column names per each input column in metadata + * @param startIdx A starting integer index to reference input column + * @return A sequence of [[ExaColumnInfo]] columns + */ + @SuppressWarnings(Array("org.wartremover.warts.MutableDataStructures")) + private[this] def getColumns( + meta: ExaMetadata, + srcColumnNames: Seq[String], + startIdx: Int + ): Seq[ExaColumnInfo] = { + val totalColumnCnt = meta.getInputColumnCount.toInt + val columns = ListBuffer[ExaColumnInfo]() + + for { idx <- startIdx until totalColumnCnt } columns.append( + ExaColumnInfo( + name = srcColumnNames(idx - startIdx), + `type` = meta.getInputColumnType(idx), + precision = meta.getInputColumnPrecision(idx).toInt, + scale = meta.getInputColumnScale(idx).toInt, + length = meta.getInputColumnLength(idx).toInt, + isNullable = true + ) + ) + + columns.toSeq + } + +} diff --git a/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportFiles.scala b/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportFiles.scala index b1a6fc32..ed111c83 100644 --- a/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportFiles.scala +++ b/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportFiles.scala @@ -5,7 +5,7 @@ import scala.collection.mutable.ListBuffer import com.exasol.ExaIterator import com.exasol.ExaMetadata import com.exasol.cloudetl.bucket._ -import com.exasol.cloudetl.source.ParquetSource +import com.exasol.cloudetl.parquet.ParquetSource import com.typesafe.scalalogging.LazyLogging import org.apache.hadoop.conf.Configuration diff --git a/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportPath.scala b/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportPath.scala index 062b1ae1..981615db 100644 --- a/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportPath.scala +++ b/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportPath.scala @@ -22,8 +22,7 @@ object ImportPath { val scriptSchema = exaMeta.getScriptSchema - s""" - |SELECT + s"""SELECT | $scriptSchema.IMPORT_FILES( | '$bucketPath', '$rest', filename |) @@ -34,7 +33,7 @@ object ImportPath { |) |GROUP BY | partition_index; - """.stripMargin + |""".stripMargin } } diff --git a/src/main/scala/com/exasol/cloudetl/util/DateTimeUtil.scala b/src/main/scala/com/exasol/cloudetl/util/DateTimeUtil.scala new file mode 100644 index 00000000..84df7c37 --- /dev/null +++ b/src/main/scala/com/exasol/cloudetl/util/DateTimeUtil.scala @@ -0,0 +1,79 @@ +package com.exasol.cloudetl.util + +import java.sql.Date +import java.sql.Timestamp +import java.time._ +import java.util.TimeZone + +/** + * Helper functions to convert date time values + */ +object DateTimeUtil { + // scalastyle:off magic.number + val UnixEpochDateTime: LocalDateTime = LocalDateTime.of(1970, 1, 1, 0, 0, 0) + // scalastyle:on magic.number + + val JULIAN_DAY_OF_EPOCH: Long = 2440588 + val SECONDS_PER_DAY: Long = 60 * 60 * 24L + val MILLIS_PER_SECOND: Long = 1000L + val MILLIS_PER_DAY: Long = SECONDS_PER_DAY * MILLIS_PER_SECOND + val MICROS_PER_MILLIS: Long = 1000L + val MICROS_PER_SECOND: Long = MICROS_PER_MILLIS * MILLIS_PER_SECOND + val MICROS_PER_DAY: Long = MICROS_PER_SECOND * SECONDS_PER_DAY + + /** Returns a [[java.sql.Timestamp]] timestamp from number of microseconds since epoch */ + @SuppressWarnings(Array("org.wartremover.warts.Var")) + def getTimestampFromMicros(us: Long): Timestamp = { + // setNanos() will overwrite the millisecond part, so the milliseconds should be cut off at + // seconds + var seconds = us / MICROS_PER_SECOND + var micros = us % MICROS_PER_SECOND + if (micros < 0) { // setNanos() can not accept negative value + micros += MICROS_PER_SECOND + seconds -= 1 + } + val ts = new Timestamp(seconds * 1000) + ts.setNanos(micros.toInt * 1000) + + ts + } + + /** Returns the number of micros since epoch from [[java.sql.Timestamp]] */ + def getMicrosFromTimestamp(ts: Timestamp): Long = + if (ts != null) { + ts.getTime() * 1000L + (ts.getNanos().toLong / 1000) % 1000L + } else { + 0L + } + + /** Returns Julian day and nanoseconds in a day from microseconds since epoch */ + @SuppressWarnings(Array("org.wartremover.contrib.warts.ExposedTuples")) + def getJulianDayAndNanos(us: Long): (Int, Long) = { + val julian_us = us + JULIAN_DAY_OF_EPOCH * MICROS_PER_DAY + val day = julian_us / MICROS_PER_DAY + val micros = julian_us % MICROS_PER_DAY + (day.toInt, micros * 1000L) + } + + /** Returns microseconds since epoch from Julian day and nanoseconds in a day */ + def getMicrosFromJulianDay(day: Int, nanos: Long): Long = { + val seconds = (day - JULIAN_DAY_OF_EPOCH).toLong * SECONDS_PER_DAY + seconds * MICROS_PER_SECOND + nanos / 1000L + } + + /** Returns the number of days since unix epoch */ + @SuppressWarnings(Array("org.wartremover.contrib.warts.OldTime")) + def daysSinceEpoch(date: Date): Long = { + val millisUtc = date.getTime + val millis = millisUtc + (TimeZone.getTimeZone(ZoneId.systemDefault).getOffset(millisUtc)) + Math.floor(millis.toDouble / MILLIS_PER_DAY).toLong + } + + /** Returns a [[java.sql.Date]] date given the days since epoch */ + def daysToDate(days: Long): Date = { + val date = UnixEpochDateTime.plusDays(days) + val millis = date.atZone(ZoneId.systemDefault).toInstant.toEpochMilli + new Date(millis) + } + +} diff --git a/src/main/scala/com/exasol/cloudetl/util/SchemaUtil.scala b/src/main/scala/com/exasol/cloudetl/util/SchemaUtil.scala new file mode 100644 index 00000000..83a56164 --- /dev/null +++ b/src/main/scala/com/exasol/cloudetl/util/SchemaUtil.scala @@ -0,0 +1,173 @@ +package com.exasol.cloudetl.util + +import com.exasol.ExaIterator +import com.exasol.cloudetl.data.ExaColumnInfo + +import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.OriginalType +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName +import org.apache.parquet.schema.Type +import org.apache.parquet.schema.Type.Repetition +import org.apache.parquet.schema.Types + +object SchemaUtil { + + val DECIMAL_MAX_PRECISION: Int = 38 + val DECIMAL_MAX_INT_DIGITS: Int = 9 + val DECIMAL_MAX_LONG_DIGITS: Int = 18 + + // Maps the precision value into the number of bytes + // Adapted from: + // - org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe.java + val PRECISION_TO_BYTE_SIZE: Seq[Int] = { + for { + prec <- 1 to 38 // [1 .. 38] + power = Math.pow(10, prec.toDouble) // scalastyle:ignore magic.number + size = Math.ceil((Math.log(power - 1) / Math.log(2) + 1) / 8) + } yield size.toInt + } + + /** + * Given the Exasol column information returns Parquet [[org.apache.parquet.schema.MessageType]] + */ + def createParquetMessageType(columns: Seq[ExaColumnInfo], schemaName: String): MessageType = { + val types = columns.map(exaColumnToParquetType(_)) + new MessageType(schemaName, types: _*) + } + + // In below several lines, I try to pattern match on Class[X] of Java types. + // Please also read: + // https://stackoverflow.com/questions/7519140/pattern-matching-on-class-type + object JTypes { + val jInteger: Class[java.lang.Integer] = classOf[java.lang.Integer] + val jLong: Class[java.lang.Long] = classOf[java.lang.Long] + val jBigDecimal: Class[java.math.BigDecimal] = classOf[java.math.BigDecimal] + val jDouble: Class[java.lang.Double] = classOf[java.lang.Double] + val jBoolean: Class[java.lang.Boolean] = classOf[java.lang.Boolean] + val jString: Class[java.lang.String] = classOf[java.lang.String] + val jSqlDate: Class[java.sql.Date] = classOf[java.sql.Date] + val jSqlTimestamp: Class[java.sql.Timestamp] = classOf[java.sql.Timestamp] + } + + /** + * Given Exasol column [[com.exasol.cloudetl.data.ExaColumnInfo]] information convert it into + * Parquet [[org.apache.parquet.schema.Type$]] + */ + def exaColumnToParquetType(colInfo: ExaColumnInfo): Type = { + val colName = colInfo.name + val colType = colInfo.`type` + val repetition = if (colInfo.isNullable) Repetition.OPTIONAL else Repetition.REQUIRED + + import JTypes._ + + colType match { + case `jInteger` => + if (colInfo.precision == 0) { + Types + .primitive(PrimitiveTypeName.INT32, repetition) + .named(colName) + } else { + require( + colInfo.precision <= DECIMAL_MAX_INT_DIGITS, + s"Got an 'Integer' type with more than '$DECIMAL_MAX_INT_DIGITS' precision." + ) + Types + .primitive(PrimitiveTypeName.INT32, repetition) + .precision(colInfo.precision) + .scale(colInfo.scale) + .as(OriginalType.DECIMAL) + .named(colName) + } + + case `jLong` => + if (colInfo.precision == 0) { + Types + .primitive(PrimitiveTypeName.INT64, repetition) + .named(colName) + } else { + require( + colInfo.precision <= DECIMAL_MAX_LONG_DIGITS, + s"Got a 'Long' type with more than '$DECIMAL_MAX_LONG_DIGITS' precision." + ) + Types + .primitive(PrimitiveTypeName.INT64, repetition) + .precision(colInfo.precision) + .scale(colInfo.scale) + .as(OriginalType.DECIMAL) + .named(colName) + } + + case `jBigDecimal` => + Types + .primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, repetition) + .precision(colInfo.precision) + .scale(colInfo.scale) + .length(PRECISION_TO_BYTE_SIZE(colInfo.precision - 1)) + .as(OriginalType.DECIMAL) + .named(colName) + + case `jDouble` => + Types + .primitive(PrimitiveTypeName.DOUBLE, repetition) + .named(colName) + + case `jString` => + if (colInfo.length > 0) { + Types + .primitive(PrimitiveTypeName.BINARY, repetition) + .as(OriginalType.UTF8) + .length(colInfo.length) + .named(colName) + } else { + Types + .primitive(PrimitiveTypeName.BINARY, repetition) + .as(OriginalType.UTF8) + .named(colName) + } + + case `jBoolean` => + Types + .primitive(PrimitiveTypeName.BOOLEAN, repetition) + .named(colName) + + case `jSqlDate` => + Types + .primitive(PrimitiveTypeName.INT32, repetition) + .as(OriginalType.DATE) + .named(colName) + + case `jSqlTimestamp` => + Types + .primitive(PrimitiveTypeName.INT96, repetition) + .named(colName) + + case _ => + throw new IllegalArgumentException( + s"Cannot convert Exasol type '$colType' to Parquet type." + ) + } + } + + /** + * Returns a value from Exasol [[ExaIterator]] iterator on given index which have + * [[com.exasol.cloudetl.data.ExaColumnInfo]] column type + */ + def exaColumnToValue(iter: ExaIterator, idx: Int, colInfo: ExaColumnInfo): Any = { + val colType = colInfo.`type` + import JTypes._ + + colType match { + case `jInteger` => iter.getInteger(idx) + case `jLong` => iter.getLong(idx) + case `jBigDecimal` => iter.getBigDecimal(idx) + case `jDouble` => iter.getDouble(idx) + case `jString` => iter.getString(idx) + case `jBoolean` => iter.getBoolean(idx) + case `jSqlDate` => iter.getDate(idx) + case `jSqlTimestamp` => iter.getTimestamp(idx) + case _ => + throw new IllegalArgumentException(s"Cannot get Exasol value for column type '$colType'.") + } + } + +} diff --git a/src/test/resources/parquet/sales_positions1.snappy.parquet b/src/test/resources/data/import/parquet/sales_positions1.snappy.parquet similarity index 100% rename from src/test/resources/parquet/sales_positions1.snappy.parquet rename to src/test/resources/data/import/parquet/sales_positions1.snappy.parquet diff --git a/src/test/resources/parquet/sales_positions2.snappy.parquet b/src/test/resources/data/import/parquet/sales_positions2.snappy.parquet similarity index 100% rename from src/test/resources/parquet/sales_positions2.snappy.parquet rename to src/test/resources/data/import/parquet/sales_positions2.snappy.parquet diff --git a/src/test/resources/parquet/sales_positions_small.snappy.parquet b/src/test/resources/data/import/parquet/sales_positions_small.snappy.parquet similarity index 100% rename from src/test/resources/parquet/sales_positions_small.snappy.parquet rename to src/test/resources/data/import/parquet/sales_positions_small.snappy.parquet diff --git a/src/test/scala/com/exasol/cloudetl/parquet/ParquetSourceSuite.scala b/src/test/scala/com/exasol/cloudetl/parquet/ParquetSourceSuite.scala new file mode 100644 index 00000000..63856e6d --- /dev/null +++ b/src/test/scala/com/exasol/cloudetl/parquet/ParquetSourceSuite.scala @@ -0,0 +1,54 @@ +package com.exasol.cloudetl.parquet + +import java.nio.file.Paths + +import com.exasol.cloudetl.util.FsUtil + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FileSystem +import org.apache.parquet.schema.MessageTypeParser +import org.scalatest.FunSuite +import org.scalatest.Matchers + +@SuppressWarnings(Array("org.wartremover.warts.Any")) +class ParquetSourceSuite extends FunSuite with Matchers { + + private val conf = new Configuration() + private val fs = FileSystem.get(conf) + + private val salesPosParquetFile = + Paths.get(getClass.getResource("/data/import/parquet/sales_positions1.snappy.parquet").toURI) + + test("reads a single parquet file") { + val data = ParquetSource(FsUtil.globWithLocal(salesPosParquetFile, fs), fs, conf) + val iters = data.stream + assert(iters.map(_.size).sum === 500) + } + + private val resourcesDir = salesPosParquetFile.getParent + private val pattern = s"${resourcesDir.toUri.toString}/*.parquet" + + test("reads multiple parquet files") { + val iters = ParquetSource(pattern, fs, conf).stream() + assert(iters.map(_.size).sum === 1005) + } + + test("reads parquet files schema") { + val schema = ParquetSource(pattern, fs, conf).getSchema() + val expectedMsgType = MessageTypeParser + .parseMessageType("""message spark_schema { + | optional int64 sales_id; + | optional int32 position_id; + | optional int32 article_id; + | optional int32 amount; + | optional double price; + | optional int32 voucher_id; + | optional boolean canceled; + |} + """.stripMargin) + + assert(schema.isDefined) + schema.foreach { case msgType => assert(msgType === expectedMsgType) } + } + +} diff --git a/src/test/scala/com/exasol/cloudetl/scriptclasses/BaseImportSuite.scala b/src/test/scala/com/exasol/cloudetl/scriptclasses/BaseSuite.scala similarity index 78% rename from src/test/scala/com/exasol/cloudetl/scriptclasses/BaseImportSuite.scala rename to src/test/scala/com/exasol/cloudetl/scriptclasses/BaseSuite.scala index 9f65afe7..b5f058e9 100644 --- a/src/test/scala/com/exasol/cloudetl/scriptclasses/BaseImportSuite.scala +++ b/src/test/scala/com/exasol/cloudetl/scriptclasses/BaseSuite.scala @@ -11,7 +11,7 @@ import org.scalatest.FunSuite import org.scalatest.Matchers import org.scalatest.mockito.MockitoSugar -trait BaseImportSuite extends FunSuite with Matchers with MockitoSugar { +trait BaseSuite extends FunSuite with Matchers with MockitoSugar { val testSchema = "my_schema" @@ -27,8 +27,12 @@ trait BaseImportSuite extends FunSuite with Matchers with MockitoSugar { "S3_SECRET_KEY" -> s3SecretKey ) - val resourcePath: String = norm(Paths.get(getClass.getResource("/parquet").toURI)) - val resourceBucket: String = s"$resourcePath/*.parquet" + val rest = + s"""BUCKET_PATH:=:$s3BucketPath;S3_ENDPOINT:=:$s3Endpoint;""" + + s"""S3_ACCESS_KEY:=:$s3AccessKey;S3_SECRET_KEY:=:$s3SecretKey""" + + val resourcePath: String = norm(Paths.get(getClass.getResource("/data").toURI)) + val resourceImportBucket: String = s"$resourcePath/import/parquet/*.parquet" final def norm(path: Path): String = path.toUri.toString.replaceAll("/$", "").replaceAll("///", "/") diff --git a/src/test/scala/com/exasol/cloudetl/scriptclasses/ExportPathSuite.scala b/src/test/scala/com/exasol/cloudetl/scriptclasses/ExportPathSuite.scala new file mode 100644 index 00000000..823af5c0 --- /dev/null +++ b/src/test/scala/com/exasol/cloudetl/scriptclasses/ExportPathSuite.scala @@ -0,0 +1,77 @@ +package com.exasol.cloudetl.scriptclasses + +import scala.collection.JavaConverters._ + +import com.exasol.ExaExportSpecification +import com.exasol.ExaMetadata + +import org.mockito.Mockito._ + +class ExportPathSuite extends BaseSuite { + + test("`generateSqlForExportSpec` should create a sql statement") { + val exaMeta = mock[ExaMetadata] + val exaSpec = mock[ExaExportSpecification] + + when(exaMeta.getScriptSchema()).thenReturn(testSchema) + when(exaSpec.getParameters()).thenReturn(params.asJava) + + val srcCols = Seq("tbl.col_int", "c_bool", "c_char") + when(exaSpec.getSourceColumnNames).thenReturn(srcCols.asJava) + + val sqlExpected = + s"""SELECT + | $testSchema.EXPORT_TABLE( + | '$s3BucketPath', '$rest', 'col_int.c_bool.c_char', col_int, c_bool, c_char + |) + |FROM + | DUAL + |GROUP BY + | nproc(); + |""".stripMargin + + assert(ExportPath.generateSqlForExportSpec(exaMeta, exaSpec) === sqlExpected) + verify(exaMeta, atLeastOnce).getScriptSchema + verify(exaSpec, times(1)).getParameters + verify(exaSpec, times(1)).getSourceColumnNames + } + + test("`generateSqlForExportSpec` should throw an exception if any required param is missing") { + val exaMeta = mock[ExaMetadata] + val exaSpec = mock[ExaExportSpecification] + + val newParams = params - ("S3_ACCESS_KEY") + + when(exaMeta.getScriptSchema()).thenReturn(testSchema) + when(exaSpec.getParameters()).thenReturn(newParams.asJava) + + val thrown = intercept[IllegalArgumentException] { + ExportPath.generateSqlForExportSpec(exaMeta, exaSpec) + } + + assert(thrown.getMessage === "The required parameter S3_ACCESS_KEY is not defined!") + verify(exaSpec, times(1)).getParameters + verify(exaSpec, never).getSourceColumnNames + } + + test("`generateSqlForExportSpec` throws if column cannot be parsed (contains extra '.')") { + val exaMeta = mock[ExaMetadata] + val exaSpec = mock[ExaExportSpecification] + + when(exaMeta.getScriptSchema()).thenReturn(testSchema) + when(exaSpec.getParameters()).thenReturn(params.asJava) + + val srcCols = Seq("tbl.c_int.integer") + when(exaSpec.getSourceColumnNames).thenReturn(srcCols.asJava) + + val thrown = intercept[RuntimeException] { + ExportPath.generateSqlForExportSpec(exaMeta, exaSpec) + } + + assert(thrown.getMessage === "Could not parse the column name from 'tbl.c_int.integer'!") + verify(exaMeta, atLeastOnce).getScriptSchema + verify(exaSpec, times(1)).getParameters + verify(exaSpec, times(1)).getSourceColumnNames + } + +} diff --git a/src/test/scala/com/exasol/cloudetl/scriptclasses/ExportTableSuite.scala b/src/test/scala/com/exasol/cloudetl/scriptclasses/ExportTableSuite.scala new file mode 100644 index 00000000..6a50a175 --- /dev/null +++ b/src/test/scala/com/exasol/cloudetl/scriptclasses/ExportTableSuite.scala @@ -0,0 +1,149 @@ +package com.exasol.cloudetl.scriptclasses + +import java.io.IOException +import java.nio.file._ +import java.nio.file.attribute.BasicFileAttributes + +import com.exasol.ExaIterator +import com.exasol.ExaMetadata + +import org.mockito.ArgumentMatchers.any +import org.mockito.ExtraMockito +import org.mockito.Mockito._ + +@SuppressWarnings(Array("org.wartremover.warts.JavaSerializable")) +class ExportTableSuite extends BaseSuite { + + val srcColumns: Seq[String] = Seq( + "c_int", + "c_long", + "c_decimal", + "c_double", + "c_string", + "c_boolean", + "c_date", + "c_timestamp" + ) + + private val bd1 = new java.math.BigDecimal("5555555555555555555555555555555.55555") + private val bd2 = new java.math.BigDecimal("5555555555555555555555555555555.55555") + private val dt1 = new java.sql.Date(System.currentTimeMillis()) + private val dt2 = new java.sql.Date(System.currentTimeMillis()) + private val ts1 = new java.sql.Timestamp(System.currentTimeMillis()) + private val ts2 = new java.sql.Timestamp(System.currentTimeMillis()) + + val records: Seq[Seq[Object]] = Seq( + Seq(1, 3L, bd1, 3.14d, "xyz", true, dt1, ts1), + Seq(2, 4L, bd2, 0.13d, "abc", false, dt2, ts2) + ).map { seq => + seq.map(_.asInstanceOf[AnyRef]) + } + + final def createMockedIter(resourceDir: String): ExaIterator = { + val mockedIter = commonExaIterator(resourceDir) + when(mockedIter.getString(2)).thenReturn(srcColumns.mkString(".")) + when(mockedIter.next()).thenReturn(true, false) + + when(mockedIter.getInteger(3)).thenReturn(1, 2) + when(mockedIter.getLong(4)).thenReturn(3L, 4L) + when(mockedIter.getBigDecimal(5)).thenReturn(bd1, bd2) + when(mockedIter.getDouble(6)).thenReturn(3.14, 0.13) + when(mockedIter.getString(7)).thenReturn("xyz", "abc") + when(mockedIter.getBoolean(8)).thenReturn(true, false) + when(mockedIter.getDate(9)).thenReturn(dt1, dt2) + when(mockedIter.getTimestamp(10)).thenReturn(ts1, ts2) + + mockedIter + } + + final def createMockedMeta(): ExaMetadata = { + val mockedMeta = mock[ExaMetadata] + when(mockedMeta.getInputColumnCount()).thenReturn(11L) + val returns = Seq( + (3, classOf[java.lang.Integer], 0L, 0L, 0L), + (4, classOf[java.lang.Long], 0L, 0L, 0L), + (5, classOf[java.math.BigDecimal], 36L, 5L, 0L), + (6, classOf[java.lang.Double], 0L, 0L, 0L), + (7, classOf[java.lang.String], 0L, 0L, 3L), + (8, classOf[java.lang.Boolean], 0L, 0L, 0L), + (9, classOf[java.sql.Date], 0L, 0L, 0L), + (10, classOf[java.sql.Timestamp], 0L, 0L, 0L) + ) + returns.foreach { + case (idx, cls, prec, scale, len) => + ExtraMockito.doReturn(cls).when(mockedMeta).getInputColumnType(idx) + when(mockedMeta.getInputColumnPrecision(idx)).thenReturn(prec) + when(mockedMeta.getInputColumnScale(idx)).thenReturn(scale) + when(mockedMeta.getInputColumnLength(idx)).thenReturn(len) + } + + mockedMeta + } + + test("`run` should export the Exasol rows from ExaIterator") { + val tempDir = Files.createTempDirectory("exportTableTest") + + val meta = createMockedMeta() + val iter = createMockedIter(tempDir.toUri.toString) + + ExportTable.run(meta, iter) + + verify(meta, times(1)).getInputColumnCount + for { idx <- 3 to 10 } { + verify(meta, times(1)).getInputColumnType(idx) + verify(meta, times(1)).getInputColumnPrecision(idx) + verify(meta, times(1)).getInputColumnScale(idx) + verify(meta, times(1)).getInputColumnLength(idx) + } + + verify(iter, times(2)).getInteger(3) + verify(iter, times(2)).getLong(4) + verify(iter, times(2)).getBigDecimal(5) + verify(iter, times(2)).getDouble(6) + verify(iter, times(2)).getString(7) + verify(iter, times(2)).getBoolean(8) + verify(iter, times(2)).getDate(9) + verify(iter, times(2)).getTimestamp(10) + + deleteFiles(tempDir) + } + + test("import exported rows from a file") { + val tempDir = Files.createTempDirectory("importExportTableTest") + val meta = createMockedMeta() + val iter = createMockedIter(tempDir.toUri.toString) + + ExportTable.run(meta, iter) + + val importIter = commonExaIterator(resourceImportBucket) + when(importIter.next()).thenReturn(false) + when(importIter.getString(2)).thenReturn(tempDir.toUri.toString) + + ImportFiles.run(mock[ExaMetadata], importIter) + + val totalRecords = 2 + verify(importIter, times(totalRecords)).emit(Seq(any[Object]): _*) + + // TODO: verify each emitted row + + deleteFiles(tempDir) + } + + final def deleteFiles(dir: Path): Unit = { + Files.walkFileTree( + dir, + new SimpleFileVisitor[Path] { + override def visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult = { + Files.delete(file) + FileVisitResult.CONTINUE + } + override def postVisitDirectory(dir: Path, exc: IOException): FileVisitResult = { + Files.delete(dir) + FileVisitResult.CONTINUE + } + } + ) + () + } + +} diff --git a/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportFilesSuite.scala b/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportFilesSuite.scala index 1125f84f..56784747 100644 --- a/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportFilesSuite.scala +++ b/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportFilesSuite.scala @@ -6,13 +6,13 @@ import org.mockito.ArgumentMatchers.any import org.mockito.Mockito._ @SuppressWarnings(Array("org.wartremover.warts.Any")) -class ImportFilesSuite extends BaseImportSuite { +class ImportFilesSuite extends BaseSuite { test("`run` should emit total number of records") { - val file1 = s"$resourcePath/sales_positions1.snappy.parquet" - val file2 = s"$resourcePath/sales_positions2.snappy.parquet" + val file1 = s"$resourcePath/import/parquet/sales_positions1.snappy.parquet" + val file2 = s"$resourcePath/import/parquet/sales_positions2.snappy.parquet" - val exaIter = commonExaIterator(resourceBucket) + val exaIter = commonExaIterator(resourceImportBucket) when(exaIter.next()).thenReturn(true, false) when(exaIter.getString(2)).thenReturn(file1, file2) @@ -37,10 +37,10 @@ class ImportFilesSuite extends BaseImportSuite { * +---------+-----------+----------+------+-----+----------+--------+ * */ - test("`run` should emit corrent sequence of records") { - val file = s"$resourcePath/sales_positions_small.snappy.parquet" + test("`run` should emit correct sequence of records") { + val file = s"$resourcePath/import/parquet/sales_positions_small.snappy.parquet" - val exaIter = commonExaIterator(resourceBucket) + val exaIter = commonExaIterator(resourceImportBucket) when(exaIter.next()).thenReturn(false) when(exaIter.getString(2)).thenReturn(file) diff --git a/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportMetadataSuite.scala b/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportMetadataSuite.scala index 7082ef39..35193c20 100644 --- a/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportMetadataSuite.scala +++ b/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportMetadataSuite.scala @@ -5,18 +5,21 @@ import com.exasol.ExaMetadata import org.mockito.ArgumentMatchers.anyString import org.mockito.Mockito._ -class ImportMetadataSuite extends BaseImportSuite { +class ImportMetadataSuite extends BaseSuite { test("`run` should create a list of files names") { - val exaIter = commonExaIterator(resourceBucket) + val exaIter = commonExaIterator(resourceImportBucket) when(exaIter.getInteger(2)).thenReturn(2) ImportMetadata.run(mock[ExaMetadata], exaIter) verify(exaIter, times(3)).emit(anyString(), anyString()) - verify(exaIter, times(1)).emit(s"$resourcePath/sales_positions1.snappy.parquet", "0") - verify(exaIter, times(1)).emit(s"$resourcePath/sales_positions2.snappy.parquet", "1") - verify(exaIter, times(1)).emit(s"$resourcePath/sales_positions_small.snappy.parquet", "0") + verify(exaIter, times(1)) + .emit(s"$resourcePath/import/parquet/sales_positions1.snappy.parquet", "0") + verify(exaIter, times(1)) + .emit(s"$resourcePath/import/parquet/sales_positions2.snappy.parquet", "1") + verify(exaIter, times(1)) + .emit(s"$resourcePath/import/parquet/sales_positions_small.snappy.parquet", "0") } } diff --git a/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportPathSuite.scala b/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportPathSuite.scala index 9a58e602..4c23f156 100644 --- a/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportPathSuite.scala +++ b/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportPathSuite.scala @@ -7,7 +7,7 @@ import com.exasol.ExaMetadata import org.mockito.Mockito._ -class ImportPathSuite extends BaseImportSuite { +class ImportPathSuite extends BaseSuite { test("`generateSqlForImportSpec` should create a sql statement") { val exaMeta = mock[ExaMetadata] @@ -16,13 +16,8 @@ class ImportPathSuite extends BaseImportSuite { when(exaMeta.getScriptSchema()).thenReturn(testSchema) when(exaSpec.getParameters()).thenReturn(params.asJava) - val rest = - s"""BUCKET_PATH:=:$s3BucketPath;S3_ENDPOINT:=:$s3Endpoint;""" + - s"""S3_ACCESS_KEY:=:$s3AccessKey;S3_SECRET_KEY:=:$s3SecretKey""" - val sqlExpected = - s""" - |SELECT + s"""SELECT | $testSchema.IMPORT_FILES( | '$s3BucketPath', '$rest', filename |) @@ -33,9 +28,9 @@ class ImportPathSuite extends BaseImportSuite { |) |GROUP BY | partition_index; - """.stripMargin + |""".stripMargin - assert(ImportPath.generateSqlForImportSpec(exaMeta, exaSpec).trim === sqlExpected.trim) + assert(ImportPath.generateSqlForImportSpec(exaMeta, exaSpec) === sqlExpected) verify(exaMeta, atLeastOnce).getScriptSchema verify(exaSpec, times(1)).getParameters } diff --git a/src/test/scala/com/exasol/cloudetl/source/ParquetSourceSuite.scala b/src/test/scala/com/exasol/cloudetl/source/ParquetSourceSuite.scala deleted file mode 100644 index 1966f21a..00000000 --- a/src/test/scala/com/exasol/cloudetl/source/ParquetSourceSuite.scala +++ /dev/null @@ -1,35 +0,0 @@ -package com.exasol.cloudetl.source - -import java.nio.file.Paths - -import com.exasol.cloudetl.util.FsUtil - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.FileSystem -import org.scalatest.FunSuite -import org.scalatest.Matchers - -@SuppressWarnings(Array("org.wartremover.warts.Any")) -class ParquetSourceSuite extends FunSuite with Matchers { - - private val conf = new Configuration() - private val fs = FileSystem.get(conf) - - private val salesPosParquetFile = - Paths.get(getClass.getResource("/parquet/sales_positions1.snappy.parquet").toURI) - - test("reads a single parquet file") { - val data = ParquetSource(FsUtil.globWithLocal(salesPosParquetFile, fs), fs, conf) - val iters = data.stream - assert(iters.map(_.size).sum === 500) - } - - test("reads multiple parquet files") { - val resourcesDir = salesPosParquetFile.getParent - val pattern = s"${resourcesDir.toUri.toString}/*.parquet" - val data = ParquetSource(pattern, fs, conf) - val iters = data.stream - assert(iters.map(_.size).sum === 1005) - } - -} diff --git a/src/test/scala/com/exasol/cloudetl/util/DateTimeUtilSuite.scala b/src/test/scala/com/exasol/cloudetl/util/DateTimeUtilSuite.scala new file mode 100644 index 00000000..bc6e65db --- /dev/null +++ b/src/test/scala/com/exasol/cloudetl/util/DateTimeUtilSuite.scala @@ -0,0 +1,51 @@ +package com.exasol.cloudetl.util + +import java.sql.Date +import java.text.SimpleDateFormat +import java.util.Locale + +import org.scalatest.FunSuite +import org.scalatest.Matchers + +@SuppressWarnings(Array("org.wartremover.contrib.warts.OldTime")) +class DateTimeUtilSuite extends FunSuite with Matchers { + + final def daysSinceEpochToDate(dt: Date): Unit = { + val newDT = DateTimeUtil.daysToDate(DateTimeUtil.daysSinceEpoch(dt)) + assert(dt.toString === newDT.toString) + () + } + + test("from java.sql.Date to days (since epoch) and back to java.sql.Date") { + val df1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US) + val df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss z", Locale.US) + + val testDate = Seq( + new Date(100), + new Date(df1.parse("1776-07-04 10:30:00").getTime), + new Date(df2.parse("1776-07-04 18:30:00 UTC").getTime), + Date.valueOf("1912-05-05"), + Date.valueOf("1969-01-01"), + new Date(df1.parse("1969-01-01 00:00:00").getTime), + new Date(df2.parse("1969-01-01 00:00:00 UTC").getTime), + new Date(df1.parse("1969-01-01 00:00:01").getTime), + new Date(df2.parse("1969-01-01 00:00:01 UTC").getTime), + new Date(df1.parse("1969-12-31 23:59:59").getTime), + new Date(df2.parse("1969-12-31 23:59:59 UTC").getTime), + Date.valueOf("1970-01-01"), + new Date(df1.parse("1970-01-01 00:00:00").getTime), + new Date(df2.parse("1970-01-01 00:00:00 UTC").getTime), + new Date(df1.parse("1970-01-01 00:00:01").getTime), + new Date(df2.parse("1970-01-01 00:00:01 UTC").getTime), + new Date(df1.parse("1989-11-09 11:59:59").getTime), + new Date(df2.parse("1989-11-09 19:59:59 UTC").getTime), + Date.valueOf("2019-02-10") + ) + + testDate.foreach { case dt => daysSinceEpochToDate(dt) } + } + + test("correctly converts date `0001-01-01` to days and back to date") { + daysSinceEpochToDate(Date.valueOf("0001-01-01")) + } +} diff --git a/src/test/scala/com/exasol/cloudetl/util/SchemaUtilSuite.scala b/src/test/scala/com/exasol/cloudetl/util/SchemaUtilSuite.scala new file mode 100644 index 00000000..7b6f0adf --- /dev/null +++ b/src/test/scala/com/exasol/cloudetl/util/SchemaUtilSuite.scala @@ -0,0 +1,162 @@ +package com.exasol.cloudetl.util + +import com.exasol.ExaIterator +import com.exasol.cloudetl.data.ExaColumnInfo + +import org.apache.parquet.schema._ +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName +import org.apache.parquet.schema.Type.Repetition +import org.mockito.Mockito._ +import org.scalatest.FunSuite +import org.scalatest.Matchers +import org.scalatest.mockito.MockitoSugar + +@SuppressWarnings(Array("org.wartremover.contrib.warts.ExposedTuples")) +class SchemaUtilSuite extends FunSuite with Matchers with MockitoSugar { + + test("`createParquetMessageType` throws an exception for unknown type") { + val thrown = intercept[IllegalArgumentException] { + SchemaUtil.createParquetMessageType( + Seq(ExaColumnInfo("c_short", classOf[java.lang.Short], 0, 0, 0, false)), + "test_schema" + ) + } + val expectedMsg = s"Cannot convert Exasol type '${classOf[java.lang.Short]}' to Parquet type." + assert(thrown.getMessage === expectedMsg) + } + + test("`createParquetMessageType` creates parquet message type from list of exa columns") { + + val exaColumns = Seq( + ExaColumnInfo("c_int", classOf[java.lang.Integer], 0, 0, 0, true), + ExaColumnInfo("c_int", classOf[java.lang.Integer], 1, 0, 0, true), + ExaColumnInfo("c_int", classOf[java.lang.Integer], 9, 0, 0, true), + ExaColumnInfo("c_long", classOf[java.lang.Long], 0, 0, 0, false), + ExaColumnInfo("c_long", classOf[java.lang.Long], 18, 0, 0, true), + ExaColumnInfo("c_decimal_int", classOf[java.math.BigDecimal], 9, 0, 0, false), + ExaColumnInfo("c_decimal_long", classOf[java.math.BigDecimal], 17, 0, 0, false), + ExaColumnInfo("c_decimal", classOf[java.math.BigDecimal], 38, 10, 16, false), + ExaColumnInfo("c_double", classOf[java.lang.Double], 0, 0, 0, true), + ExaColumnInfo("c_string", classOf[java.lang.String], 0, 0, 0, false), + ExaColumnInfo("c_string", classOf[java.lang.String], 0, 0, 20, false), + ExaColumnInfo("c_boolean", classOf[java.lang.Boolean], 0, 0, 0, false), + ExaColumnInfo("c_date", classOf[java.sql.Date], 0, 0, 0, false), + ExaColumnInfo("c_timestamp", classOf[java.sql.Timestamp], 0, 0, 0, false) + ) + + val schemaName = "exasol_export_schema" + + val messageType = new MessageType( + schemaName, + new PrimitiveType(Repetition.OPTIONAL, PrimitiveType.PrimitiveTypeName.INT32, "c_int"), + Types + .primitive(PrimitiveTypeName.INT32, Repetition.OPTIONAL) + .precision(1) + .scale(0) + .as(OriginalType.DECIMAL) + .named("c_int"), + Types + .primitive(PrimitiveTypeName.INT32, Repetition.OPTIONAL) + .precision(9) + .scale(0) + .as(OriginalType.DECIMAL) + .named("c_int"), + new PrimitiveType(Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT64, "c_long"), + Types + .primitive(PrimitiveTypeName.INT64, Repetition.OPTIONAL) + .precision(18) + .scale(0) + .as(OriginalType.DECIMAL) + .named("c_long"), + Types + .primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, Repetition.REQUIRED) + .precision(9) + .scale(0) + .length(4) + .as(OriginalType.DECIMAL) + .named("c_decimal_int"), + Types + .primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, Repetition.REQUIRED) + .precision(17) + .scale(0) + .length(8) + .as(OriginalType.DECIMAL) + .named("c_decimal_long"), + Types + .primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, Repetition.REQUIRED) + .precision(38) + .scale(10) + .length(16) + .as(OriginalType.DECIMAL) + .named("c_decimal"), + new PrimitiveType(Repetition.OPTIONAL, PrimitiveType.PrimitiveTypeName.DOUBLE, "c_double"), + new PrimitiveType( + Repetition.REQUIRED, + PrimitiveType.PrimitiveTypeName.BINARY, + "c_string", + OriginalType.UTF8 + ), + Types + .primitive(PrimitiveTypeName.BINARY, Repetition.REQUIRED) + .length(20) + .as(OriginalType.UTF8) + .named("c_string"), + new PrimitiveType( + Repetition.REQUIRED, + PrimitiveType.PrimitiveTypeName.BOOLEAN, + "c_boolean" + ), + Types + .primitive(PrimitiveTypeName.INT32, Repetition.REQUIRED) + .as(OriginalType.DATE) + .named("c_date"), + new PrimitiveType(Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT96, "c_timestamp") + ) + + assert(SchemaUtil.createParquetMessageType(exaColumns, schemaName) === messageType) + } + + test("`exaColumnToValue` returns value with column type") { + val iter = mock[ExaIterator] + val startIdx = 3 + val bd = new java.math.BigDecimal(1337) + val dt = new java.sql.Date(System.currentTimeMillis()) + val ts = new java.sql.Timestamp(System.currentTimeMillis()) + + when(iter.getInteger(3)).thenReturn(1) + when(iter.getLong(4)).thenReturn(3L) + when(iter.getBigDecimal(5)).thenReturn(bd) + when(iter.getDouble(6)).thenReturn(3.14) + when(iter.getString(7)).thenReturn("xyz") + when(iter.getBoolean(8)).thenReturn(true) + when(iter.getDate(9)).thenReturn(dt) + when(iter.getTimestamp(10)).thenReturn(ts) + + val data = Seq( + 1 -> ExaColumnInfo("c_int", classOf[java.lang.Integer]), + 3L -> ExaColumnInfo("c_long", classOf[java.lang.Long]), + bd -> ExaColumnInfo("c_decimal", classOf[java.math.BigDecimal]), + 3.14 -> ExaColumnInfo("c_double", classOf[java.lang.Double]), + "xyz" -> ExaColumnInfo("c_string", classOf[java.lang.String]), + true -> ExaColumnInfo("c_boolean", classOf[java.lang.Boolean]), + dt -> ExaColumnInfo("c_date", classOf[java.sql.Date]), + ts -> ExaColumnInfo("c_timestamp", classOf[java.sql.Timestamp]) + ) + + data.zipWithIndex.map { + case ((expectedValue, col), idx) => + val nxtIdx = startIdx + idx + val ret = SchemaUtil.exaColumnToValue(iter, nxtIdx, col) + assert(ret === expectedValue) + assert(ret.getClass === col.`type`) + } + + val thrown = intercept[IllegalArgumentException] { + SchemaUtil.exaColumnToValue(iter, 0, ExaColumnInfo("c_short", classOf[java.lang.Short])) + } + assert( + thrown.getMessage === "Cannot get Exasol value for column type 'class java.lang.Short'." + ) + + } +} diff --git a/src/test/scala/org/mockito/ExtraMockito.scala b/src/test/scala/org/mockito/ExtraMockito.scala new file mode 100644 index 00000000..1be7fd22 --- /dev/null +++ b/src/test/scala/org/mockito/ExtraMockito.scala @@ -0,0 +1,28 @@ +package org.mockito + +import org.mockito.stubbing.Stubber + +/** Extra helper functions for mockito mocking */ +object ExtraMockito { + + /** + * Delegates the call to Mockito.doReturn(toBeReturned, toBeReturnedNext) but fixes + * the following compiler issue that happens because the overloaded vararg on the Java side + * + * {{{ + * Error:(33, 25) ambiguous reference to overloaded definition, both method doReturn in class + * Mockito of type (x$1: Any, x$2: Object*)org.mockito.stubbing.Stubber and method doReturn + * in class Mockito of type (x$1: Any)org.mockito.stubbing.Stubber match argument types + * (`Type`) + * }}} + * + * This is adapted from mockito-scala project, + * - mockito-scala/blob/master/core/src/main/scala/org/mockito/MockitoAPI.scala#L59 + */ + def doReturn[T](toBeReturned: T, toBeReturnedNext: T*): Stubber = + Mockito.doReturn( + toBeReturned, + toBeReturnedNext.map(_.asInstanceOf[Object]): _* + ) + +}