Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement TimeStampXXXTZVector for parquet isAdjustedToUTC #926 #576 #577 #927

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package org.jetbrains.kotlinx.dataframe.io

import kotlinx.datetime.Instant
import kotlinx.datetime.LocalDate
import kotlinx.datetime.LocalDateTime
import kotlinx.datetime.LocalTime
import kotlinx.datetime.TimeZone
import kotlinx.datetime.toKotlinLocalDate
import kotlinx.datetime.toKotlinLocalDateTime
import kotlinx.datetime.toKotlinLocalTime
import kotlinx.datetime.toLocalDateTime
import org.apache.arrow.memory.RootAllocator
import org.apache.arrow.vector.BigIntVector
import org.apache.arrow.vector.BitVector
Expand All @@ -25,9 +28,13 @@ import org.apache.arrow.vector.TimeMicroVector
import org.apache.arrow.vector.TimeMilliVector
import org.apache.arrow.vector.TimeNanoVector
import org.apache.arrow.vector.TimeSecVector
import org.apache.arrow.vector.TimeStampMicroTZVector
import org.apache.arrow.vector.TimeStampMicroVector
import org.apache.arrow.vector.TimeStampMilliTZVector
import org.apache.arrow.vector.TimeStampMilliVector
import org.apache.arrow.vector.TimeStampNanoTZVector
import org.apache.arrow.vector.TimeStampNanoVector
import org.apache.arrow.vector.TimeStampSecTZVector
import org.apache.arrow.vector.TimeStampSecVector
import org.apache.arrow.vector.TinyIntVector
import org.apache.arrow.vector.UInt1Vector
Expand Down Expand Up @@ -63,6 +70,7 @@ import java.math.BigDecimal
import java.math.BigInteger
import java.nio.channels.ReadableByteChannel
import java.nio.channels.SeekableByteChannel
import java.util.concurrent.TimeUnit
import kotlin.reflect.KType
import kotlin.reflect.full.withNullability
import kotlin.reflect.typeOf
Expand Down Expand Up @@ -170,6 +178,16 @@ private fun TimeStampNanoVector.values(range: IntRange): List<LocalDateTime?> =
}
}

private fun TimeStampNanoTZVector.values(range: IntRange): List<LocalDateTime?> =
range.mapIndexed { i, it ->
if (isNull(i)) {
null
} else {
DateUtility.getLocalDateTimeFromEpochNano(getObject(it), this.timeZone)
.toKotlinLocalDateTime()
}
}

private fun TimeStampMicroVector.values(range: IntRange): List<LocalDateTime?> =
range.mapIndexed { i, it ->
if (isNull(i)) {
Expand All @@ -179,6 +197,16 @@ private fun TimeStampMicroVector.values(range: IntRange): List<LocalDateTime?> =
}
}

private fun TimeStampMicroTZVector.values(range: IntRange): List<LocalDateTime?> =
range.mapIndexed { i, it ->
if (isNull(i)) {
null
} else {
DateUtility.getLocalDateTimeFromEpochMicro(getObject(it), this.timeZone)
.toKotlinLocalDateTime()
}
}

private fun TimeStampMilliVector.values(range: IntRange): List<LocalDateTime?> =
range.mapIndexed { i, it ->
if (isNull(i)) {
Expand All @@ -188,6 +216,16 @@ private fun TimeStampMilliVector.values(range: IntRange): List<LocalDateTime?> =
}
}

private fun TimeStampMilliTZVector.values(range: IntRange): List<LocalDateTime?> =
range.mapIndexed { i, it ->
if (isNull(i)) {
null
} else {
Instant.fromEpochMilliseconds(getObject(it))
.toLocalDateTime(TimeZone.of(this.timeZone))
}
}

private fun TimeStampSecVector.values(range: IntRange): List<LocalDateTime?> =
range.mapIndexed { i, it ->
if (isNull(i)) {
Expand All @@ -197,6 +235,16 @@ private fun TimeStampSecVector.values(range: IntRange): List<LocalDateTime?> =
}
}

private fun TimeStampSecTZVector.values(range: IntRange): List<LocalDateTime?> =
range.mapIndexed { i, it ->
if (isNull(i)) {
null
} else {
DateUtility.getLocalDateTimeFromEpochMilli(TimeUnit.SECONDS.toMillis(getObject(it)), this.timeZone)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at second glance I'm not so sure about this one.
Parquet does not have seconds precision and my PR is for Parquet, but pyarrow feather can .floor('S') its datetimes.

I'm not sure what's going to be present in its .feather file, either seconds or milliseconds.

I need to test this against a .feather file together with seconds precision and timezone awareness, perhaps one from https://github.com/Kotlin/dataframe/tree/master/dataframe-arrow/src/test/resources

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your pull request focuses on Parquet, but it would be beneficial to incorporate upgrades for all Arrow TimeStamp types as well, because Parquet reading (in my PR) relies totally on Arrow.
The test testTimeStamp covers both cases for IPC format and Feather format (by using ArrowStreamWriter and ArrowFileWriter), so you only should add TimeStampNanoTZVector,TimeStampMicroVector and TimeStampSecTZVector to the writeArrowTimestamp method and to the expected DataFrame in testTimeStamp as you did for TimeStampMicroTZVector.
Finally as your PR improves Arrow types compatibility it could be merge independently of #577 (IMHO) 😃

.toKotlinLocalDateTime()
}
}

private fun StructVector.values(range: IntRange): List<Map<String, Any?>?> =
range.map {
getObject(it)
Expand Down Expand Up @@ -343,12 +391,20 @@ private fun readField(root: VectorSchemaRoot, field: Field, nullability: Nullabi

is TimeStampNanoVector -> vector.values(range).withTypeNullable(field.isNullable, nullability)

is TimeStampNanoTZVector -> vector.values(range).withTypeNullable(field.isNullable, nullability)

is TimeStampMicroVector -> vector.values(range).withTypeNullable(field.isNullable, nullability)

is TimeStampMicroTZVector -> vector.values(range).withTypeNullable(field.isNullable, nullability)
laurentperez marked this conversation as resolved.
Show resolved Hide resolved

is TimeStampMilliVector -> vector.values(range).withTypeNullable(field.isNullable, nullability)

is TimeStampMilliTZVector -> vector.values(range).withTypeNullable(field.isNullable, nullability)

is TimeStampSecVector -> vector.values(range).withTypeNullable(field.isNullable, nullability)

is TimeStampSecTZVector -> vector.values(range).withTypeNullable(field.isNullable, nullability)

is StructVector -> vector.values(range).withTypeNullable(field.isNullable, nullability)

is NullVector -> vector.values(range).withTypeNullable(field.isNullable, nullability)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,13 @@ import kotlinx.datetime.UtcOffset
import kotlinx.datetime.toInstant
import kotlinx.datetime.toJavaInstant
import org.apache.arrow.memory.RootAllocator
import org.apache.arrow.vector.TimeStampMicroTZVector
import org.apache.arrow.vector.TimeStampMicroVector
import org.apache.arrow.vector.TimeStampMilliTZVector
import org.apache.arrow.vector.TimeStampMilliVector
import org.apache.arrow.vector.TimeStampNanoTZVector
import org.apache.arrow.vector.TimeStampNanoVector
import org.apache.arrow.vector.TimeStampSecTZVector
import org.apache.arrow.vector.TimeStampSecVector
import org.apache.arrow.vector.VectorSchemaRoot
import org.apache.arrow.vector.ipc.ArrowFileReader
Expand Down Expand Up @@ -532,9 +536,13 @@ internal class ArrowKtTest {

val dataFrame = dataFrameOf(
"ts_nano" to dates,
"ts_nano_tz" to dates,
"ts_micro" to dates,
"ts_micro_tz" to dates,
"ts_milli" to dates,
"ts_milli_tz" to dates,
"ts_sec" to dates,
"ts_sec_tz" to dates,
)

DataFrame.readArrowFeather(writeArrowTimestamp(dates)) shouldBe dataFrame
Expand All @@ -549,42 +557,79 @@ internal class ArrowKtTest {
null,
)

val timeStampMilliTZ = Field(
"ts_milli_tz",
FieldType.nullable(ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC")),
null,
)

val timeStampMicro = Field(
"ts_micro",
FieldType.nullable(ArrowType.Timestamp(TimeUnit.MICROSECOND, null)),
null,
)

val timeStampMicroTZ = Field(
"ts_micro_tz",
FieldType.nullable(ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC")),
null,
)

val timeStampNano = Field(
"ts_nano",
FieldType.nullable(ArrowType.Timestamp(TimeUnit.NANOSECOND, null)),
null,
)

val timeStampNanoTZ = Field(
"ts_nano_tz",
FieldType.nullable(ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC")),
null,
)

val timeStampSec = Field(
"ts_sec",
FieldType.nullable(ArrowType.Timestamp(TimeUnit.SECOND, null)),
null,
)

val timeStampSecTZ = Field(
"ts_sec_tz",
FieldType.nullable(ArrowType.Timestamp(TimeUnit.SECOND, "UTC")),
null,
)

val schemaTimeStamp = Schema(
listOf(timeStampNano, timeStampMicro, timeStampMilli, timeStampSec),
listOf(timeStampNano, timeStampNanoTZ, timeStampMicro, timeStampMicroTZ, timeStampMilli, timeStampMilliTZ, timeStampSec, timeStampSecTZ),
)
VectorSchemaRoot.create(schemaTimeStamp, allocator).use { vectorSchemaRoot ->
val timeStampMilliVector = vectorSchemaRoot.getVector("ts_milli") as TimeStampMilliVector
val timeStampMilliTZVector = vectorSchemaRoot.getVector("ts_milli_tz") as TimeStampMilliTZVector
val timeStampNanoVector = vectorSchemaRoot.getVector("ts_nano") as TimeStampNanoVector
val timeStampNanoTZVector = vectorSchemaRoot.getVector("ts_nano_tz") as TimeStampNanoTZVector
val timeStampMicroVector = vectorSchemaRoot.getVector("ts_micro") as TimeStampMicroVector
val timeStampMicroTZVector = vectorSchemaRoot.getVector("ts_micro_tz") as TimeStampMicroTZVector
val timeStampSecVector = vectorSchemaRoot.getVector("ts_sec") as TimeStampSecVector
val timeStampSecTZVector = vectorSchemaRoot.getVector("ts_sec_tz") as TimeStampSecTZVector
timeStampMilliVector.allocateNew(dates.size)
timeStampMilliTZVector.allocateNew(dates.size)
timeStampNanoVector.allocateNew(dates.size)
timeStampNanoTZVector.allocateNew(dates.size)
timeStampMicroVector.allocateNew(dates.size)
timeStampMicroTZVector.allocateNew(dates.size)
timeStampSecVector.allocateNew(dates.size)
timeStampSecTZVector.allocateNew(dates.size)

dates.forEachIndexed { index, localDateTime ->
val instant = localDateTime.toInstant(UtcOffset.ZERO).toJavaInstant()
timeStampNanoVector[index] = instant.toEpochMilli() * 1_000_000L + instant.nano
timeStampNanoTZVector[index] = instant.toEpochMilli() * 1_000_000L + instant.nano
timeStampMicroVector[index] = instant.toEpochMilli() * 1_000L
timeStampMicroTZVector[index] = instant.toEpochMilli() * 1_000L
timeStampMilliVector[index] = instant.toEpochMilli()
timeStampMilliTZVector[index] = instant.toEpochMilli()
timeStampSecVector[index] = instant.toEpochMilli() / 1_000L
timeStampSecTZVector[index] = instant.toEpochMilli() / 1_000L
}
vectorSchemaRoot.setRowCount(dates.size)
val bos = ByteArrayOutputStream()
Expand Down