Skip to content

Commit

Permalink
Merge pull request #53 from exasol/feature/import-parquet-int64-millis
Browse files Browse the repository at this point in the history
Fix Parquet import timestamps encoded as milliseconds since epoch.
  • Loading branch information
morazow authored Nov 28, 2019
2 parents c490ca5 + d2a3dfa commit 882ac14
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 18 deletions.
20 changes: 11 additions & 9 deletions src/main/scala/com/exasol/cloudetl/parquet/RowReadSupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class RowRootConverter(schema: GroupType) extends GroupConverter {
}
case PrimitiveTypeName.INT64 =>
originalType match {
case OriginalType.TIMESTAMP_MILLIS => new RowTimestampConverter(this, idx)
case OriginalType.TIMESTAMP_MILLIS => new RowTimestampMillisConverter(this, idx)
case OriginalType.DECIMAL =>
val decimalMetadata = primitiveType.getDecimalMetadata
new RowDecimalConverter(
Expand All @@ -136,20 +136,15 @@ class RowRootConverter(schema: GroupType) extends GroupConverter {
case _ => new RowPrimitiveConverter(this, idx)
}

case PrimitiveTypeName.INT96 => new RowTimestampConverter(this, idx)

case _ =>
throw new UnsupportedOperationException(
s"Parquet type '$typeName' cannot be read into Exasol type."
)
case PrimitiveTypeName.INT96 => new RowTimestampInt96Converter(this, idx)
}
}

private final class RowPrimitiveConverter(val parent: RowRootConverter, val index: Int)
extends PrimitiveConverter {

override def addBinary(value: Binary): Unit =
parent.currentResult.update(index, value.getBytes())
parent.currentResult.update(index, new String(value.getBytes()))

override def addBoolean(value: Boolean): Unit =
parent.currentResult.update(index, value)
Expand Down Expand Up @@ -194,7 +189,14 @@ class RowRootConverter(schema: GroupType) extends GroupConverter {
}
}

private final class RowTimestampConverter(val parent: RowRootConverter, val index: Int)
private final class RowTimestampMillisConverter(val parent: RowRootConverter, val index: Int)
extends PrimitiveConverter {

override def addLong(value: Long): Unit =
parent.currentResult.update(index, DateTimeUtil.getTimestampFromMillis(value))
}

private final class RowTimestampInt96Converter(val parent: RowRootConverter, val index: Int)
extends PrimitiveConverter {

override def addBinary(value: Binary): Unit = {
Expand Down
37 changes: 28 additions & 9 deletions src/main/scala/com/exasol/cloudetl/util/DateTimeUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import java.time._
import java.util.TimeZone

/**
* Helper functions to convert date time values
* Helper functions to convert date time values.
*/
object DateTimeUtil {
// scalastyle:off magic.number
Expand All @@ -21,11 +21,14 @@ object DateTimeUtil {
val MICROS_PER_SECOND: Long = MICROS_PER_MILLIS * MILLIS_PER_SECOND
val MICROS_PER_DAY: Long = MICROS_PER_SECOND * SECONDS_PER_DAY

/** Returns a [[java.sql.Timestamp]] timestamp from number of microseconds since epoch */
/**
* Returns a [[java.sql.Timestamp]] timestamp from number of
* microseconds since epoch.
*/
@SuppressWarnings(Array("org.wartremover.warts.Var"))
def getTimestampFromMicros(us: Long): Timestamp = {
// setNanos() will overwrite the millisecond part, so the milliseconds should be cut off at
// seconds
// setNanos() will overwrite the millisecond part, so the
// milliseconds should be cut off at seconds
var seconds = us / MICROS_PER_SECOND
var micros = us % MICROS_PER_SECOND
if (micros < 0) { // setNanos() can not accept negative value
Expand All @@ -38,15 +41,28 @@ object DateTimeUtil {
ts
}

/** Returns the number of micros since epoch from [[java.sql.Timestamp]] */
/**
* Returns a [[java.sql.Timestamp]] timestamp from number of
* milliseconds since epoch.
*/
def getTimestampFromMillis(millis: Long): Timestamp =
new Timestamp(millis)

/**
* Returns the number of micros since epoch from
* [[java.sql.Timestamp]].
*/
def getMicrosFromTimestamp(ts: Timestamp): Long =
if (ts != null) {
ts.getTime() * 1000L + (ts.getNanos().toLong / 1000) % 1000L
} else {
0L
}

/** Returns Julian day and nanoseconds in a day from microseconds since epoch */
/**
* Returns Julian day and nanoseconds in a day from microseconds since
* epoch.
*/
@SuppressWarnings(Array("org.wartremover.contrib.warts.ExposedTuples"))
def getJulianDayAndNanos(us: Long): (Int, Long) = {
val julian_us = us + JULIAN_DAY_OF_EPOCH * MICROS_PER_DAY
Expand All @@ -55,21 +71,24 @@ object DateTimeUtil {
(day.toInt, micros * 1000L)
}

/** Returns microseconds since epoch from Julian day and nanoseconds in a day */
/**
* Returns microseconds since epoch from Julian day and nanoseconds in
* a day.
*/
def getMicrosFromJulianDay(day: Int, nanos: Long): Long = {
val seconds = (day - JULIAN_DAY_OF_EPOCH).toLong * SECONDS_PER_DAY
seconds * MICROS_PER_SECOND + nanos / 1000L
}

/** Returns the number of days since unix epoch */
/** Returns the number of days since unix epoch. */
@SuppressWarnings(Array("org.wartremover.contrib.warts.OldTime"))
def daysSinceEpoch(date: Date): Long = {
val millisUtc = date.getTime
val millis = millisUtc + (TimeZone.getTimeZone(ZoneId.systemDefault).getOffset(millisUtc))
Math.floor(millis.toDouble / MILLIS_PER_DAY).toLong
}

/** Returns a [[java.sql.Date]] date given the days since epoch */
/** Returns a [[java.sql.Date]] date given the days since epoch. */
def daysToDate(days: Long): Date = {
val date = UnixEpochDateTime.plusDays(days)
val millis = date.atZone(ZoneId.systemDefault).toInstant.toEpochMilli
Expand Down
141 changes: 141 additions & 0 deletions src/test/scala/com/exasol/cloudetl/parquet/ParquetRowReaderTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package com.exasol.cloudetl.parquet

import java.nio.file.Path

import com.exasol.cloudetl.DummyRecordsTest
import com.exasol.cloudetl.data.Row
import com.exasol.cloudetl.source.ParquetSource

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path => HPath}
import org.apache.hadoop.fs.FileSystem
import org.apache.parquet.example.data.Group
import org.apache.parquet.example.data.GroupWriter
import org.apache.parquet.example.data.simple.SimpleGroup
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.api.WriteSupport
import org.apache.parquet.io.api.RecordConsumer
import org.apache.parquet.schema.MessageType
import org.apache.parquet.schema.MessageTypeParser
import org.scalatest.BeforeAndAfterEach
import org.scalatest.FunSuite

class ParquetRowReaderTest extends FunSuite with BeforeAndAfterEach with DummyRecordsTest {

private[this] var conf: Configuration = _
private[this] var fileSystem: FileSystem = _
private[this] var outputDirectory: Path = _
private[this] var path: HPath = _

override final def beforeEach(): Unit = {
conf = new Configuration
fileSystem = FileSystem.get(conf)
outputDirectory = createTemporaryFolder("parquetRowReaderTest")
path = new HPath(outputDirectory.toUri.toString, "part-00000.parquet")
()
}

override final def afterEach(): Unit = {
deleteFiles(outputDirectory)
()
}

private[this] def write(schema: MessageType, record: SimpleGroup): Unit = {
val writer = BaseGroupWriterBuilder(path, schema).build()
writer.write(record)
writer.close()
}

test("read throws if parquet record has complex types") {
val schema = MessageTypeParser
.parseMessageType("""message user {
| required binary name (UTF8);
| optional group contacts {
| repeated group array {
| required binary name (UTF8);
| optional binary phoneNumber (UTF8);
| }
| }
|}
""".stripMargin)
val record = new SimpleGroup(schema)
record.add(0, "A. Name")
val contacts = record.addGroup(1)
contacts.addGroup(0).append("name", "A. Contact").append("phoneNumber", "1337")
contacts.addGroup(0).append("name", "Second Contact")
write(schema, record)

val thrown = intercept[UnsupportedOperationException] {
ParquetSource(path, conf, fileSystem).stream().size
}
assert(thrown.getMessage === "Currently only primitive types are supported")
}

test("reads INT64 (TIMESTAMP_MILLIS) as Timestamp value") {

val schema = MessageTypeParser
.parseMessageType("""message test {
| required int64 col_long;
| required int64 col_timestamp (TIMESTAMP_MILLIS);
|}
""".stripMargin)
val record = new SimpleGroup(schema)
record.append("col_long", 153L)
record.append("col_timestamp", TIMESTAMP_VALUE1.getTime())
write(schema, record)

val src = ParquetSource(path, conf, fileSystem)
assert(src.stream().toSeq(0) === Row(Seq(153L, TIMESTAMP_VALUE1)))
}

test("reads non-decimal FIXED_LEN_BYTE_ARRAY as String value") {
val size = 5
val schema = MessageTypeParser
.parseMessageType(s"""message test {
| required fixed_len_byte_array($size) col_byte_array;
|}
""".stripMargin)
val record = new SimpleGroup(schema)
record.append("col_byte_array", "hello")
write(schema, record)

val src = ParquetSource(path, conf, fileSystem)
assert(src.stream().toSeq(0) === Row(Seq("hello")))
}

test("reads non-UTF8 BINARY as String value") {
val schema = MessageTypeParser
.parseMessageType(s"""message test {
| required binary col_binary;
|}
""".stripMargin)
val record = new SimpleGroup(schema)
record.append("col_binary", "test")
write(schema, record)

val src = ParquetSource(path, conf, fileSystem)
assert(src.stream().toSeq(0) === Row(Seq("test")))
}

private[this] case class BaseGroupWriteSupport(schema: MessageType)
extends WriteSupport[Group] {
var writer: GroupWriter = null

override def prepareForWrite(recordConsumer: RecordConsumer): Unit =
writer = new GroupWriter(recordConsumer, schema)

override def init(configuration: Configuration): WriteSupport.WriteContext =
new WriteSupport.WriteContext(schema, new java.util.HashMap[String, String]())

override def write(record: Group): Unit =
writer.write(record)
}

private[this] case class BaseGroupWriterBuilder(path: HPath, schema: MessageType)
extends ParquetWriter.Builder[Group, BaseGroupWriterBuilder](path) {
override def getWriteSupport(conf: Configuration): WriteSupport[Group] =
BaseGroupWriteSupport(schema)
override def self(): BaseGroupWriterBuilder = this
}

}

0 comments on commit 882ac14

Please sign in to comment.