diff --git a/src/main/scala/com/exasol/cloudetl/parquet/RowReadSupport.scala b/src/main/scala/com/exasol/cloudetl/parquet/RowReadSupport.scala index 0d00d873..716d73f9 100644 --- a/src/main/scala/com/exasol/cloudetl/parquet/RowReadSupport.scala +++ b/src/main/scala/com/exasol/cloudetl/parquet/RowReadSupport.scala @@ -124,7 +124,7 @@ class RowRootConverter(schema: GroupType) extends GroupConverter { } case PrimitiveTypeName.INT64 => originalType match { - case OriginalType.TIMESTAMP_MILLIS => new RowTimestampConverter(this, idx) + case OriginalType.TIMESTAMP_MILLIS => new RowTimestampMillisConverter(this, idx) case OriginalType.DECIMAL => val decimalMetadata = primitiveType.getDecimalMetadata new RowDecimalConverter( @@ -136,12 +136,7 @@ class RowRootConverter(schema: GroupType) extends GroupConverter { case _ => new RowPrimitiveConverter(this, idx) } - case PrimitiveTypeName.INT96 => new RowTimestampConverter(this, idx) - - case _ => - throw new UnsupportedOperationException( - s"Parquet type '$typeName' cannot be read into Exasol type." - ) + case PrimitiveTypeName.INT96 => new RowTimestampInt96Converter(this, idx) } } @@ -149,7 +144,7 @@ class RowRootConverter(schema: GroupType) extends GroupConverter { extends PrimitiveConverter { override def addBinary(value: Binary): Unit = - parent.currentResult.update(index, value.getBytes()) + parent.currentResult.update(index, new String(value.getBytes())) override def addBoolean(value: Boolean): Unit = parent.currentResult.update(index, value) @@ -194,7 +189,14 @@ class RowRootConverter(schema: GroupType) extends GroupConverter { } } - private final class RowTimestampConverter(val parent: RowRootConverter, val index: Int) + private final class RowTimestampMillisConverter(val parent: RowRootConverter, val index: Int) + extends PrimitiveConverter { + + override def addLong(value: Long): Unit = + parent.currentResult.update(index, DateTimeUtil.getTimestampFromMillis(value)) + } + + private final class RowTimestampInt96Converter(val parent: RowRootConverter, val index: Int) extends PrimitiveConverter { override def addBinary(value: Binary): Unit = { diff --git a/src/main/scala/com/exasol/cloudetl/util/DateTimeUtil.scala b/src/main/scala/com/exasol/cloudetl/util/DateTimeUtil.scala index 84df7c37..8d397b46 100644 --- a/src/main/scala/com/exasol/cloudetl/util/DateTimeUtil.scala +++ b/src/main/scala/com/exasol/cloudetl/util/DateTimeUtil.scala @@ -6,7 +6,7 @@ import java.time._ import java.util.TimeZone /** - * Helper functions to convert date time values + * Helper functions to convert date time values. */ object DateTimeUtil { // scalastyle:off magic.number @@ -21,11 +21,14 @@ object DateTimeUtil { val MICROS_PER_SECOND: Long = MICROS_PER_MILLIS * MILLIS_PER_SECOND val MICROS_PER_DAY: Long = MICROS_PER_SECOND * SECONDS_PER_DAY - /** Returns a [[java.sql.Timestamp]] timestamp from number of microseconds since epoch */ + /** + * Returns a [[java.sql.Timestamp]] timestamp from number of + * microseconds since epoch. + */ @SuppressWarnings(Array("org.wartremover.warts.Var")) def getTimestampFromMicros(us: Long): Timestamp = { - // setNanos() will overwrite the millisecond part, so the milliseconds should be cut off at - // seconds + // setNanos() will overwrite the millisecond part, so the + // milliseconds should be cut off at seconds var seconds = us / MICROS_PER_SECOND var micros = us % MICROS_PER_SECOND if (micros < 0) { // setNanos() can not accept negative value @@ -38,7 +41,17 @@ object DateTimeUtil { ts } - /** Returns the number of micros since epoch from [[java.sql.Timestamp]] */ + /** + * Returns a [[java.sql.Timestamp]] timestamp from number of + * milliseconds since epoch. + */ + def getTimestampFromMillis(millis: Long): Timestamp = + new Timestamp(millis) + + /** + * Returns the number of micros since epoch from + * [[java.sql.Timestamp]]. + */ def getMicrosFromTimestamp(ts: Timestamp): Long = if (ts != null) { ts.getTime() * 1000L + (ts.getNanos().toLong / 1000) % 1000L @@ -46,7 +59,10 @@ object DateTimeUtil { 0L } - /** Returns Julian day and nanoseconds in a day from microseconds since epoch */ + /** + * Returns Julian day and nanoseconds in a day from microseconds since + * epoch. + */ @SuppressWarnings(Array("org.wartremover.contrib.warts.ExposedTuples")) def getJulianDayAndNanos(us: Long): (Int, Long) = { val julian_us = us + JULIAN_DAY_OF_EPOCH * MICROS_PER_DAY @@ -55,13 +71,16 @@ object DateTimeUtil { (day.toInt, micros * 1000L) } - /** Returns microseconds since epoch from Julian day and nanoseconds in a day */ + /** + * Returns microseconds since epoch from Julian day and nanoseconds in + * a day. + */ def getMicrosFromJulianDay(day: Int, nanos: Long): Long = { val seconds = (day - JULIAN_DAY_OF_EPOCH).toLong * SECONDS_PER_DAY seconds * MICROS_PER_SECOND + nanos / 1000L } - /** Returns the number of days since unix epoch */ + /** Returns the number of days since unix epoch. */ @SuppressWarnings(Array("org.wartremover.contrib.warts.OldTime")) def daysSinceEpoch(date: Date): Long = { val millisUtc = date.getTime @@ -69,7 +88,7 @@ object DateTimeUtil { Math.floor(millis.toDouble / MILLIS_PER_DAY).toLong } - /** Returns a [[java.sql.Date]] date given the days since epoch */ + /** Returns a [[java.sql.Date]] date given the days since epoch. */ def daysToDate(days: Long): Date = { val date = UnixEpochDateTime.plusDays(days) val millis = date.atZone(ZoneId.systemDefault).toInstant.toEpochMilli diff --git a/src/test/scala/com/exasol/cloudetl/parquet/ParquetRowReaderTest.scala b/src/test/scala/com/exasol/cloudetl/parquet/ParquetRowReaderTest.scala new file mode 100644 index 00000000..bc467341 --- /dev/null +++ b/src/test/scala/com/exasol/cloudetl/parquet/ParquetRowReaderTest.scala @@ -0,0 +1,141 @@ +package com.exasol.cloudetl.parquet + +import java.nio.file.Path + +import com.exasol.cloudetl.DummyRecordsTest +import com.exasol.cloudetl.data.Row +import com.exasol.cloudetl.source.ParquetSource + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{Path => HPath} +import org.apache.hadoop.fs.FileSystem +import org.apache.parquet.example.data.Group +import org.apache.parquet.example.data.GroupWriter +import org.apache.parquet.example.data.simple.SimpleGroup +import org.apache.parquet.hadoop.ParquetWriter +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.io.api.RecordConsumer +import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.MessageTypeParser +import org.scalatest.BeforeAndAfterEach +import org.scalatest.FunSuite + +class ParquetRowReaderTest extends FunSuite with BeforeAndAfterEach with DummyRecordsTest { + + private[this] var conf: Configuration = _ + private[this] var fileSystem: FileSystem = _ + private[this] var outputDirectory: Path = _ + private[this] var path: HPath = _ + + override final def beforeEach(): Unit = { + conf = new Configuration + fileSystem = FileSystem.get(conf) + outputDirectory = createTemporaryFolder("parquetRowReaderTest") + path = new HPath(outputDirectory.toUri.toString, "part-00000.parquet") + () + } + + override final def afterEach(): Unit = { + deleteFiles(outputDirectory) + () + } + + private[this] def write(schema: MessageType, record: SimpleGroup): Unit = { + val writer = BaseGroupWriterBuilder(path, schema).build() + writer.write(record) + writer.close() + } + + test("read throws if parquet record has complex types") { + val schema = MessageTypeParser + .parseMessageType("""message user { + | required binary name (UTF8); + | optional group contacts { + | repeated group array { + | required binary name (UTF8); + | optional binary phoneNumber (UTF8); + | } + | } + |} + """.stripMargin) + val record = new SimpleGroup(schema) + record.add(0, "A. Name") + val contacts = record.addGroup(1) + contacts.addGroup(0).append("name", "A. Contact").append("phoneNumber", "1337") + contacts.addGroup(0).append("name", "Second Contact") + write(schema, record) + + val thrown = intercept[UnsupportedOperationException] { + ParquetSource(path, conf, fileSystem).stream().size + } + assert(thrown.getMessage === "Currently only primitive types are supported") + } + + test("reads INT64 (TIMESTAMP_MILLIS) as Timestamp value") { + + val schema = MessageTypeParser + .parseMessageType("""message test { + | required int64 col_long; + | required int64 col_timestamp (TIMESTAMP_MILLIS); + |} + """.stripMargin) + val record = new SimpleGroup(schema) + record.append("col_long", 153L) + record.append("col_timestamp", TIMESTAMP_VALUE1.getTime()) + write(schema, record) + + val src = ParquetSource(path, conf, fileSystem) + assert(src.stream().toSeq(0) === Row(Seq(153L, TIMESTAMP_VALUE1))) + } + + test("reads non-decimal FIXED_LEN_BYTE_ARRAY as String value") { + val size = 5 + val schema = MessageTypeParser + .parseMessageType(s"""message test { + | required fixed_len_byte_array($size) col_byte_array; + |} + """.stripMargin) + val record = new SimpleGroup(schema) + record.append("col_byte_array", "hello") + write(schema, record) + + val src = ParquetSource(path, conf, fileSystem) + assert(src.stream().toSeq(0) === Row(Seq("hello"))) + } + + test("reads non-UTF8 BINARY as String value") { + val schema = MessageTypeParser + .parseMessageType(s"""message test { + | required binary col_binary; + |} + """.stripMargin) + val record = new SimpleGroup(schema) + record.append("col_binary", "test") + write(schema, record) + + val src = ParquetSource(path, conf, fileSystem) + assert(src.stream().toSeq(0) === Row(Seq("test"))) + } + + private[this] case class BaseGroupWriteSupport(schema: MessageType) + extends WriteSupport[Group] { + var writer: GroupWriter = null + + override def prepareForWrite(recordConsumer: RecordConsumer): Unit = + writer = new GroupWriter(recordConsumer, schema) + + override def init(configuration: Configuration): WriteSupport.WriteContext = + new WriteSupport.WriteContext(schema, new java.util.HashMap[String, String]()) + + override def write(record: Group): Unit = + writer.write(record) + } + + private[this] case class BaseGroupWriterBuilder(path: HPath, schema: MessageType) + extends ParquetWriter.Builder[Group, BaseGroupWriterBuilder](path) { + override def getWriteSupport(conf: Configuration): WriteSupport[Group] = + BaseGroupWriteSupport(schema) + override def self(): BaseGroupWriterBuilder = this + } + +}