From b5c614cc3942109ba517b7ec125703b5331f9c2e Mon Sep 17 00:00:00 2001 From: Laurent Perez Date: Thu, 17 Oct 2024 20:53:26 +0200 Subject: [PATCH 1/7] Implement TimeStampMicroTZVector for parquet isAdjustedToUTC timestamp columns #926 --- .../kotlinx/dataframe/io/arrowReadingImpl.kt | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt b/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt index f7c7eb9407..ea835530cb 100644 --- a/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt +++ b/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt @@ -1,11 +1,14 @@ package org.jetbrains.kotlinx.dataframe.io +import kotlinx.datetime.Instant import kotlinx.datetime.LocalDate import kotlinx.datetime.LocalDateTime import kotlinx.datetime.LocalTime +import kotlinx.datetime.TimeZone import kotlinx.datetime.toKotlinLocalDate import kotlinx.datetime.toKotlinLocalDateTime import kotlinx.datetime.toKotlinLocalTime +import kotlinx.datetime.toLocalDateTime import org.apache.arrow.memory.RootAllocator import org.apache.arrow.vector.BigIntVector import org.apache.arrow.vector.BitVector @@ -21,6 +24,7 @@ import org.apache.arrow.vector.LargeVarBinaryVector import org.apache.arrow.vector.LargeVarCharVector import org.apache.arrow.vector.NullVector import org.apache.arrow.vector.SmallIntVector +import org.apache.arrow.vector.TimeStampMicroTZVector import org.apache.arrow.vector.TimeMicroVector import org.apache.arrow.vector.TimeMilliVector import org.apache.arrow.vector.TimeNanoVector @@ -179,6 +183,16 @@ private fun TimeStampMicroVector.values(range: IntRange): List = } } +private fun TimeStampMicroTZVector.values(range: IntRange): List = + range.mapIndexed { i, it -> + if (isNull(i)) { + null + } else { + Instant.fromEpochMilliseconds(getObject(it) / 1000) + .toLocalDateTime(TimeZone.of(this.timeZone)) + } + } + private fun TimeStampMilliVector.values(range: IntRange): List = range.mapIndexed { i, it -> if (isNull(i)) { @@ -345,6 +359,8 @@ private fun readField(root: VectorSchemaRoot, field: Field, nullability: Nullabi is TimeStampMicroVector -> vector.values(range).withTypeNullable(field.isNullable, nullability) + is TimeStampMicroTZVector -> vector.values(range).withTypeNullable(field.isNullable, nullability) + is TimeStampMilliVector -> vector.values(range).withTypeNullable(field.isNullable, nullability) is TimeStampSecVector -> vector.values(range).withTypeNullable(field.isNullable, nullability) From d5bd3a2cca6a2c14f007a291342026d4d28c7ce6 Mon Sep 17 00:00:00 2001 From: Laurent Perez Date: Fri, 18 Oct 2024 14:59:38 +0200 Subject: [PATCH 2/7] test for TimeStampMicroTZVector --- .../kotlinx/dataframe/io/arrowReadingImpl.kt | 2 +- .../jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt | 13 ++++++++++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt b/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt index ea835530cb..5ee90fb027 100644 --- a/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt +++ b/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt @@ -188,7 +188,7 @@ private fun TimeStampMicroTZVector.values(range: IntRange): List if (isNull(i)) { null } else { - Instant.fromEpochMilliseconds(getObject(it) / 1000) + Instant.fromEpochMilliseconds(getObject(it) / 1_000L) .toLocalDateTime(TimeZone.of(this.timeZone)) } } diff --git a/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt b/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt index 78356f4fed..77b2ed0518 100644 --- a/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt +++ b/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt @@ -9,6 +9,7 @@ import kotlinx.datetime.UtcOffset import kotlinx.datetime.toInstant import kotlinx.datetime.toJavaInstant import org.apache.arrow.memory.RootAllocator +import org.apache.arrow.vector.TimeStampMicroTZVector import org.apache.arrow.vector.TimeStampMicroVector import org.apache.arrow.vector.TimeStampMilliVector import org.apache.arrow.vector.TimeStampNanoVector @@ -533,6 +534,7 @@ internal class ArrowKtTest { val dataFrame = dataFrameOf( "ts_nano" to dates, "ts_micro" to dates, + "ts_micro_tz" to dates, "ts_milli" to dates, "ts_sec" to dates, ) @@ -555,6 +557,12 @@ internal class ArrowKtTest { null, ) + val timeStampMicroTZ = Field( + "ts_micro_tz", + FieldType.nullable(ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC")), + null, + ) + val timeStampNano = Field( "ts_nano", FieldType.nullable(ArrowType.Timestamp(TimeUnit.NANOSECOND, null)), @@ -567,22 +575,25 @@ internal class ArrowKtTest { null, ) val schemaTimeStamp = Schema( - listOf(timeStampNano, timeStampMicro, timeStampMilli, timeStampSec), + listOf(timeStampNano, timeStampMicro, timeStampMicroTZ, timeStampMilli, timeStampSec), ) VectorSchemaRoot.create(schemaTimeStamp, allocator).use { vectorSchemaRoot -> val timeStampMilliVector = vectorSchemaRoot.getVector("ts_milli") as TimeStampMilliVector val timeStampNanoVector = vectorSchemaRoot.getVector("ts_nano") as TimeStampNanoVector val timeStampMicroVector = vectorSchemaRoot.getVector("ts_micro") as TimeStampMicroVector + val timeStampMicroTZVector = vectorSchemaRoot.getVector("ts_micro_tz") as TimeStampMicroTZVector val timeStampSecVector = vectorSchemaRoot.getVector("ts_sec") as TimeStampSecVector timeStampMilliVector.allocateNew(dates.size) timeStampNanoVector.allocateNew(dates.size) timeStampMicroVector.allocateNew(dates.size) + timeStampMicroTZVector.allocateNew(dates.size) timeStampSecVector.allocateNew(dates.size) dates.forEachIndexed { index, localDateTime -> val instant = localDateTime.toInstant(UtcOffset.ZERO).toJavaInstant() timeStampNanoVector[index] = instant.toEpochMilli() * 1_000_000L + instant.nano timeStampMicroVector[index] = instant.toEpochMilli() * 1_000L + timeStampMicroTZVector[index] = instant.toEpochMilli() * 1_000L timeStampMilliVector[index] = instant.toEpochMilli() timeStampSecVector[index] = instant.toEpochMilli() / 1_000L } From 330951c386e8e6696ae5e4abb3258190db078403 Mon Sep 17 00:00:00 2001 From: Laurent Perez Date: Fri, 18 Oct 2024 16:34:48 +0200 Subject: [PATCH 3/7] add support for TimeStampNanoTZVector --- .../kotlinx/dataframe/io/arrowReadingImpl.kt | 16 ++++++++++++++++ .../kotlinx/dataframe/io/ArrowKtTest.kt | 13 ++++++++++++- 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt b/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt index 5ee90fb027..381a15f187 100644 --- a/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt +++ b/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt @@ -31,6 +31,7 @@ import org.apache.arrow.vector.TimeNanoVector import org.apache.arrow.vector.TimeSecVector import org.apache.arrow.vector.TimeStampMicroVector import org.apache.arrow.vector.TimeStampMilliVector +import org.apache.arrow.vector.TimeStampNanoTZVector import org.apache.arrow.vector.TimeStampNanoVector import org.apache.arrow.vector.TimeStampSecVector import org.apache.arrow.vector.TinyIntVector @@ -174,6 +175,19 @@ private fun TimeStampNanoVector.values(range: IntRange): List = } } +private fun TimeStampNanoTZVector.values(range: IntRange): List = + range.mapIndexed { i, it -> + if (isNull(i)) { + null + } else { + val nanoseconds = getObject(it) + val seconds = nanoseconds / 1_000_000_000 + val nanosAdjustment = nanoseconds % 1_000_000_000 + Instant.fromEpochSeconds(seconds, nanosAdjustment) + .toLocalDateTime(TimeZone.of(this.timeZone)) + } + } + private fun TimeStampMicroVector.values(range: IntRange): List = range.mapIndexed { i, it -> if (isNull(i)) { @@ -357,6 +371,8 @@ private fun readField(root: VectorSchemaRoot, field: Field, nullability: Nullabi is TimeStampNanoVector -> vector.values(range).withTypeNullable(field.isNullable, nullability) + is TimeStampNanoTZVector -> vector.values(range).withTypeNullable(field.isNullable, nullability) + is TimeStampMicroVector -> vector.values(range).withTypeNullable(field.isNullable, nullability) is TimeStampMicroTZVector -> vector.values(range).withTypeNullable(field.isNullable, nullability) diff --git a/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt b/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt index 77b2ed0518..306827341b 100644 --- a/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt +++ b/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt @@ -12,6 +12,7 @@ import org.apache.arrow.memory.RootAllocator import org.apache.arrow.vector.TimeStampMicroTZVector import org.apache.arrow.vector.TimeStampMicroVector import org.apache.arrow.vector.TimeStampMilliVector +import org.apache.arrow.vector.TimeStampNanoTZVector import org.apache.arrow.vector.TimeStampNanoVector import org.apache.arrow.vector.TimeStampSecVector import org.apache.arrow.vector.VectorSchemaRoot @@ -533,6 +534,7 @@ internal class ArrowKtTest { val dataFrame = dataFrameOf( "ts_nano" to dates, + "ts_nano_tz" to dates, "ts_micro" to dates, "ts_micro_tz" to dates, "ts_milli" to dates, @@ -569,22 +571,30 @@ internal class ArrowKtTest { null, ) + val timeStampNanoTZ = Field( + "ts_nano_tz", + FieldType.nullable(ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC")), + null, + ) + val timeStampSec = Field( "ts_sec", FieldType.nullable(ArrowType.Timestamp(TimeUnit.SECOND, null)), null, ) val schemaTimeStamp = Schema( - listOf(timeStampNano, timeStampMicro, timeStampMicroTZ, timeStampMilli, timeStampSec), + listOf(timeStampNano, timeStampNanoTZ, timeStampMicro, timeStampMicroTZ, timeStampMilli, timeStampSec), ) VectorSchemaRoot.create(schemaTimeStamp, allocator).use { vectorSchemaRoot -> val timeStampMilliVector = vectorSchemaRoot.getVector("ts_milli") as TimeStampMilliVector val timeStampNanoVector = vectorSchemaRoot.getVector("ts_nano") as TimeStampNanoVector + val timeStampNanoTZVector = vectorSchemaRoot.getVector("ts_nano_tz") as TimeStampNanoTZVector val timeStampMicroVector = vectorSchemaRoot.getVector("ts_micro") as TimeStampMicroVector val timeStampMicroTZVector = vectorSchemaRoot.getVector("ts_micro_tz") as TimeStampMicroTZVector val timeStampSecVector = vectorSchemaRoot.getVector("ts_sec") as TimeStampSecVector timeStampMilliVector.allocateNew(dates.size) timeStampNanoVector.allocateNew(dates.size) + timeStampNanoTZVector.allocateNew(dates.size) timeStampMicroVector.allocateNew(dates.size) timeStampMicroTZVector.allocateNew(dates.size) timeStampSecVector.allocateNew(dates.size) @@ -592,6 +602,7 @@ internal class ArrowKtTest { dates.forEachIndexed { index, localDateTime -> val instant = localDateTime.toInstant(UtcOffset.ZERO).toJavaInstant() timeStampNanoVector[index] = instant.toEpochMilli() * 1_000_000L + instant.nano + timeStampNanoTZVector[index] = instant.toEpochMilli() * 1_000_000L + instant.nano timeStampMicroVector[index] = instant.toEpochMilli() * 1_000L timeStampMicroTZVector[index] = instant.toEpochMilli() * 1_000L timeStampMilliVector[index] = instant.toEpochMilli() From 7bc05a4ce3fed34cae3795ccf6b7e981551ed5d7 Mon Sep 17 00:00:00 2001 From: Laurent Perez Date: Fri, 18 Oct 2024 21:30:27 +0200 Subject: [PATCH 4/7] fix precision of TimeStampMicroTZVector (div. 1_000L was wrong) --- .../org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt b/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt index 381a15f187..b570c43295 100644 --- a/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt +++ b/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt @@ -202,8 +202,8 @@ private fun TimeStampMicroTZVector.values(range: IntRange): List if (isNull(i)) { null } else { - Instant.fromEpochMilliseconds(getObject(it) / 1_000L) - .toLocalDateTime(TimeZone.of(this.timeZone)) + DateUtility.getLocalDateTimeFromEpochMicro(getObject(it), this.timeZone) + .toKotlinLocalDateTime() } } From bd049d90182f45cc8ba4f0807b86f430ebfaa7b8 Mon Sep 17 00:00:00 2001 From: Laurent Perez Date: Fri, 18 Oct 2024 21:37:46 +0200 Subject: [PATCH 5/7] add support for TimeStampMilliTZVector --- .../kotlinx/dataframe/io/arrowReadingImpl.kt | 13 +++++++++++++ .../jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt | 13 ++++++++++++- 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt b/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt index b570c43295..f485e8f332 100644 --- a/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt +++ b/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt @@ -30,6 +30,7 @@ import org.apache.arrow.vector.TimeMilliVector import org.apache.arrow.vector.TimeNanoVector import org.apache.arrow.vector.TimeSecVector import org.apache.arrow.vector.TimeStampMicroVector +import org.apache.arrow.vector.TimeStampMilliTZVector import org.apache.arrow.vector.TimeStampMilliVector import org.apache.arrow.vector.TimeStampNanoTZVector import org.apache.arrow.vector.TimeStampNanoVector @@ -216,6 +217,16 @@ private fun TimeStampMilliVector.values(range: IntRange): List = } } +private fun TimeStampMilliTZVector.values(range: IntRange): List = + range.mapIndexed { i, it -> + if (isNull(i)) { + null + } else { + Instant.fromEpochMilliseconds(getObject(it)) + .toLocalDateTime(TimeZone.of(this.timeZone)) + } + } + private fun TimeStampSecVector.values(range: IntRange): List = range.mapIndexed { i, it -> if (isNull(i)) { @@ -379,6 +390,8 @@ private fun readField(root: VectorSchemaRoot, field: Field, nullability: Nullabi is TimeStampMilliVector -> vector.values(range).withTypeNullable(field.isNullable, nullability) + is TimeStampMilliTZVector -> vector.values(range).withTypeNullable(field.isNullable, nullability) + is TimeStampSecVector -> vector.values(range).withTypeNullable(field.isNullable, nullability) is StructVector -> vector.values(range).withTypeNullable(field.isNullable, nullability) diff --git a/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt b/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt index 306827341b..cc9acc40c2 100644 --- a/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt +++ b/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt @@ -11,6 +11,7 @@ import kotlinx.datetime.toJavaInstant import org.apache.arrow.memory.RootAllocator import org.apache.arrow.vector.TimeStampMicroTZVector import org.apache.arrow.vector.TimeStampMicroVector +import org.apache.arrow.vector.TimeStampMilliTZVector import org.apache.arrow.vector.TimeStampMilliVector import org.apache.arrow.vector.TimeStampNanoTZVector import org.apache.arrow.vector.TimeStampNanoVector @@ -538,6 +539,7 @@ internal class ArrowKtTest { "ts_micro" to dates, "ts_micro_tz" to dates, "ts_milli" to dates, + "ts_milli_tz" to dates, "ts_sec" to dates, ) @@ -553,6 +555,12 @@ internal class ArrowKtTest { null, ) + val timeStampMilliTZ = Field( + "ts_milli_tz", + FieldType.nullable(ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC")), + null, + ) + val timeStampMicro = Field( "ts_micro", FieldType.nullable(ArrowType.Timestamp(TimeUnit.MICROSECOND, null)), @@ -583,16 +591,18 @@ internal class ArrowKtTest { null, ) val schemaTimeStamp = Schema( - listOf(timeStampNano, timeStampNanoTZ, timeStampMicro, timeStampMicroTZ, timeStampMilli, timeStampSec), + listOf(timeStampNano, timeStampNanoTZ, timeStampMicro, timeStampMicroTZ, timeStampMilli, timeStampMilliTZ, timeStampSec), ) VectorSchemaRoot.create(schemaTimeStamp, allocator).use { vectorSchemaRoot -> val timeStampMilliVector = vectorSchemaRoot.getVector("ts_milli") as TimeStampMilliVector + val timeStampMilliTZVector = vectorSchemaRoot.getVector("ts_milli_tz") as TimeStampMilliTZVector val timeStampNanoVector = vectorSchemaRoot.getVector("ts_nano") as TimeStampNanoVector val timeStampNanoTZVector = vectorSchemaRoot.getVector("ts_nano_tz") as TimeStampNanoTZVector val timeStampMicroVector = vectorSchemaRoot.getVector("ts_micro") as TimeStampMicroVector val timeStampMicroTZVector = vectorSchemaRoot.getVector("ts_micro_tz") as TimeStampMicroTZVector val timeStampSecVector = vectorSchemaRoot.getVector("ts_sec") as TimeStampSecVector timeStampMilliVector.allocateNew(dates.size) + timeStampMilliTZVector.allocateNew(dates.size) timeStampNanoVector.allocateNew(dates.size) timeStampNanoTZVector.allocateNew(dates.size) timeStampMicroVector.allocateNew(dates.size) @@ -606,6 +616,7 @@ internal class ArrowKtTest { timeStampMicroVector[index] = instant.toEpochMilli() * 1_000L timeStampMicroTZVector[index] = instant.toEpochMilli() * 1_000L timeStampMilliVector[index] = instant.toEpochMilli() + timeStampMilliTZVector[index] = instant.toEpochMilli() timeStampSecVector[index] = instant.toEpochMilli() / 1_000L } vectorSchemaRoot.setRowCount(dates.size) From cbdf8f74e565a9131606dbba9c3b234ad6eb5c0e Mon Sep 17 00:00:00 2001 From: Laurent Perez Date: Fri, 18 Oct 2024 21:46:13 +0200 Subject: [PATCH 6/7] use proper utility method for TimeStampNanoTZVector --- .../org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt b/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt index f485e8f332..059233a192 100644 --- a/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt +++ b/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt @@ -181,11 +181,8 @@ private fun TimeStampNanoTZVector.values(range: IntRange): List if (isNull(i)) { null } else { - val nanoseconds = getObject(it) - val seconds = nanoseconds / 1_000_000_000 - val nanosAdjustment = nanoseconds % 1_000_000_000 - Instant.fromEpochSeconds(seconds, nanosAdjustment) - .toLocalDateTime(TimeZone.of(this.timeZone)) + DateUtility.getLocalDateTimeFromEpochNano(getObject(it), this.timeZone) + .toKotlinLocalDateTime() } } From e12123243ca5211373a6c0ed4c7afc19e49de630 Mon Sep 17 00:00:00 2001 From: Laurent Perez Date: Fri, 18 Oct 2024 22:22:04 +0200 Subject: [PATCH 7/7] add support for timeStampSecTZVector --- .../kotlinx/dataframe/io/arrowReadingImpl.kt | 16 +++++++++++++++- .../kotlinx/dataframe/io/ArrowKtTest.kt | 14 +++++++++++++- 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt b/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt index 059233a192..2a1b70b9b6 100644 --- a/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt +++ b/dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt @@ -24,16 +24,17 @@ import org.apache.arrow.vector.LargeVarBinaryVector import org.apache.arrow.vector.LargeVarCharVector import org.apache.arrow.vector.NullVector import org.apache.arrow.vector.SmallIntVector -import org.apache.arrow.vector.TimeStampMicroTZVector import org.apache.arrow.vector.TimeMicroVector import org.apache.arrow.vector.TimeMilliVector import org.apache.arrow.vector.TimeNanoVector import org.apache.arrow.vector.TimeSecVector +import org.apache.arrow.vector.TimeStampMicroTZVector import org.apache.arrow.vector.TimeStampMicroVector import org.apache.arrow.vector.TimeStampMilliTZVector import org.apache.arrow.vector.TimeStampMilliVector import org.apache.arrow.vector.TimeStampNanoTZVector import org.apache.arrow.vector.TimeStampNanoVector +import org.apache.arrow.vector.TimeStampSecTZVector import org.apache.arrow.vector.TimeStampSecVector import org.apache.arrow.vector.TinyIntVector import org.apache.arrow.vector.UInt1Vector @@ -69,6 +70,7 @@ import java.math.BigDecimal import java.math.BigInteger import java.nio.channels.ReadableByteChannel import java.nio.channels.SeekableByteChannel +import java.util.concurrent.TimeUnit import kotlin.reflect.KType import kotlin.reflect.full.withNullability import kotlin.reflect.typeOf @@ -233,6 +235,16 @@ private fun TimeStampSecVector.values(range: IntRange): List = } } +private fun TimeStampSecTZVector.values(range: IntRange): List = + range.mapIndexed { i, it -> + if (isNull(i)) { + null + } else { + DateUtility.getLocalDateTimeFromEpochMilli(TimeUnit.SECONDS.toMillis(getObject(it)), this.timeZone) + .toKotlinLocalDateTime() + } + } + private fun StructVector.values(range: IntRange): List?> = range.map { getObject(it) @@ -391,6 +403,8 @@ private fun readField(root: VectorSchemaRoot, field: Field, nullability: Nullabi is TimeStampSecVector -> vector.values(range).withTypeNullable(field.isNullable, nullability) + is TimeStampSecTZVector -> vector.values(range).withTypeNullable(field.isNullable, nullability) + is StructVector -> vector.values(range).withTypeNullable(field.isNullable, nullability) is NullVector -> vector.values(range).withTypeNullable(field.isNullable, nullability) diff --git a/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt b/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt index cc9acc40c2..95108db052 100644 --- a/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt +++ b/dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt @@ -15,6 +15,7 @@ import org.apache.arrow.vector.TimeStampMilliTZVector import org.apache.arrow.vector.TimeStampMilliVector import org.apache.arrow.vector.TimeStampNanoTZVector import org.apache.arrow.vector.TimeStampNanoVector +import org.apache.arrow.vector.TimeStampSecTZVector import org.apache.arrow.vector.TimeStampSecVector import org.apache.arrow.vector.VectorSchemaRoot import org.apache.arrow.vector.ipc.ArrowFileReader @@ -541,6 +542,7 @@ internal class ArrowKtTest { "ts_milli" to dates, "ts_milli_tz" to dates, "ts_sec" to dates, + "ts_sec_tz" to dates, ) DataFrame.readArrowFeather(writeArrowTimestamp(dates)) shouldBe dataFrame @@ -590,8 +592,15 @@ internal class ArrowKtTest { FieldType.nullable(ArrowType.Timestamp(TimeUnit.SECOND, null)), null, ) + + val timeStampSecTZ = Field( + "ts_sec_tz", + FieldType.nullable(ArrowType.Timestamp(TimeUnit.SECOND, "UTC")), + null, + ) + val schemaTimeStamp = Schema( - listOf(timeStampNano, timeStampNanoTZ, timeStampMicro, timeStampMicroTZ, timeStampMilli, timeStampMilliTZ, timeStampSec), + listOf(timeStampNano, timeStampNanoTZ, timeStampMicro, timeStampMicroTZ, timeStampMilli, timeStampMilliTZ, timeStampSec, timeStampSecTZ), ) VectorSchemaRoot.create(schemaTimeStamp, allocator).use { vectorSchemaRoot -> val timeStampMilliVector = vectorSchemaRoot.getVector("ts_milli") as TimeStampMilliVector @@ -601,6 +610,7 @@ internal class ArrowKtTest { val timeStampMicroVector = vectorSchemaRoot.getVector("ts_micro") as TimeStampMicroVector val timeStampMicroTZVector = vectorSchemaRoot.getVector("ts_micro_tz") as TimeStampMicroTZVector val timeStampSecVector = vectorSchemaRoot.getVector("ts_sec") as TimeStampSecVector + val timeStampSecTZVector = vectorSchemaRoot.getVector("ts_sec_tz") as TimeStampSecTZVector timeStampMilliVector.allocateNew(dates.size) timeStampMilliTZVector.allocateNew(dates.size) timeStampNanoVector.allocateNew(dates.size) @@ -608,6 +618,7 @@ internal class ArrowKtTest { timeStampMicroVector.allocateNew(dates.size) timeStampMicroTZVector.allocateNew(dates.size) timeStampSecVector.allocateNew(dates.size) + timeStampSecTZVector.allocateNew(dates.size) dates.forEachIndexed { index, localDateTime -> val instant = localDateTime.toInstant(UtcOffset.ZERO).toJavaInstant() @@ -618,6 +629,7 @@ internal class ArrowKtTest { timeStampMilliVector[index] = instant.toEpochMilli() timeStampMilliTZVector[index] = instant.toEpochMilli() timeStampSecVector[index] = instant.toEpochMilli() / 1_000L + timeStampSecTZVector[index] = instant.toEpochMilli() / 1_000L } vectorSchemaRoot.setRowCount(dates.size) val bos = ByteArrayOutputStream()