From d162c89e55b8d3f65cf293a1bc2ea5b9dee6571a Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Fri, 10 Nov 2023 15:19:46 -0600 Subject: [PATCH] Bug fixes for Parquet DATE and TIME, and improved support for TIMESTAMP (#4801) --- .../engine/table/impl/CodecLookup.java | 2 + .../java/io/deephaven/time/DateTimeUtils.java | 53 +++++++++ .../io/deephaven/time/TestDateTimeUtils.java | 16 +++ .../parquet/base/ParquetFileReader.java | 63 +++++------ .../parquet/table/ParquetSchemaReader.java | 17 ++- .../io/deephaven/parquet/table/TypeInfos.java | 101 ++++++++---------- .../table/location/ParquetColumnLocation.java | 7 +- .../pagestore/topage/ToLocalDateTimePage.java | 100 +++++++++++++++++ .../table/pagestore/topage/ToTimePage.java | 12 +-- .../parquet/table/transfer/DateTransfer.java | 4 +- .../transfer/LocalDateTimeArrayTransfer.java | 44 ++++++++ .../table/transfer/LocalDateTimeTransfer.java | 37 +++++++ .../transfer/LocalDateTimeVectorTransfer.java | 40 +++++++ .../parquet/table/transfer/TimeTransfer.java | 8 +- .../table/transfer/TransferObject.java | 10 ++ extensions/parquet/table/src/test/e0.py | 3 +- extensions/parquet/table/src/test/e1.py | 3 +- extensions/parquet/table/src/test/e2.py | 3 +- .../table/ParquetTableReadWriteTest.java | 32 ++++-- .../src/test/resources/e0/brotli.parquet | 4 +- .../table/src/test/resources/e0/gzip.parquet | 4 +- .../table/src/test/resources/e0/lz4.parquet | 4 +- .../src/test/resources/e0/snappy.parquet | 4 +- .../test/resources/e0/uncompressed.parquet | 4 +- .../table/src/test/resources/e0/zstd.parquet | 4 +- .../src/test/resources/e1/brotli.parquet | 4 +- .../table/src/test/resources/e1/gzip.parquet | 4 +- .../table/src/test/resources/e1/lz4.parquet | 4 +- .../src/test/resources/e1/snappy.parquet | 4 +- .../test/resources/e1/uncompressed.parquet | 4 +- .../table/src/test/resources/e1/zstd.parquet | 4 +- .../src/test/resources/e2/brotli.parquet | 4 +- .../table/src/test/resources/e2/gzip.parquet | 4 +- .../table/src/test/resources/e2/lz4.parquet | 4 +- .../src/test/resources/e2/snappy.parquet | 4 +- .../test/resources/e2/uncompressed.parquet | 4 +- .../table/src/test/resources/e2/zstd.parquet | 4 +- py/server/tests/test_parquet.py | 56 ++++++---- .../ReplicateParquetTransferObjects.java | 27 +++++ 39 files changed, 527 insertions(+), 183 deletions(-) create mode 100644 extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/topage/ToLocalDateTimePage.java create mode 100644 extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/LocalDateTimeArrayTransfer.java create mode 100644 extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/LocalDateTimeTransfer.java create mode 100644 extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/LocalDateTimeVectorTransfer.java diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/CodecLookup.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/CodecLookup.java index f60cad3f99e..f73d118cc17 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/CodecLookup.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/CodecLookup.java @@ -20,6 +20,7 @@ import java.time.Instant; import java.time.LocalDate; import java.time.LocalTime; +import java.time.LocalDateTime; /** * Utility class to concentrate {@link ObjectCodec} lookups. @@ -76,6 +77,7 @@ private static boolean noCodecRequired(@NotNull final Class dataType) { dataType == Instant.class || dataType == LocalDate.class || dataType == LocalTime.class || + dataType == LocalDateTime.class || dataType == String.class || // A BigDecimal column maps to a logical type of decimal, with // appropriate precision and scale calculated from column data, diff --git a/engine/time/src/main/java/io/deephaven/time/DateTimeUtils.java b/engine/time/src/main/java/io/deephaven/time/DateTimeUtils.java index 4c1a3846e68..7f86e80403e 100644 --- a/engine/time/src/main/java/io/deephaven/time/DateTimeUtils.java +++ b/engine/time/src/main/java/io/deephaven/time/DateTimeUtils.java @@ -21,6 +21,7 @@ import java.time.zone.ZoneRulesException; import java.util.Date; import java.util.Objects; +import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -981,6 +982,21 @@ public static long epochNanos(@Nullable final ZonedDateTime dateTime) { return safeComputeNanos(dateTime.toEpochSecond(), dateTime.getNano()); } + /** + * Returns nanoseconds from the Epoch for a {@link LocalDateTime} value in UTC timezone. + * + * @param localDateTime the local date time to compute the Epoch offset for + * @return nanoseconds since Epoch, or a NULL_LONG value if the local date time is null + */ + @ScriptApi + public static long epochNanosUTC(@Nullable final LocalDateTime localDateTime) { + if (localDateTime == null) { + return NULL_LONG; + } + return TimeUnit.SECONDS.toNanos(localDateTime.toEpochSecond(ZoneOffset.UTC)) + + localDateTime.toLocalTime().getNano(); + } + /** * Returns microseconds from the Epoch for an {@link Instant} value. * @@ -1399,6 +1415,43 @@ public static ZonedDateTime excelToZonedDateTime(final double excel, @Nullable f return epochMillisToZonedDateTime(excelTimeToEpochMillis(excel, timeZone), timeZone); } + /** + * Converts nanoseconds from the Epoch to a {@link LocalDateTime} in UTC timezone. + * + * @param nanos nanoseconds since Epoch + * @return {@code null} if the input is {@link QueryConstants#NULL_LONG}; otherwise the input nanoseconds from the + * Epoch converted to a {@link LocalDateTime} in UTC timezone + */ + public static @Nullable LocalDateTime epochNanosToLocalDateTimeUTC(final long nanos) { + return nanos == NULL_LONG ? null + : LocalDateTime.ofEpochSecond(nanos / 1_000_000_000L, (int) (nanos % 1_000_000_000L), ZoneOffset.UTC); + } + + /** + * Converts microseconds from the Epoch to a {@link LocalDateTime} in UTC timezone. + * + * @param micros microseconds since Epoch + * @return {@code null} if the input is {@link QueryConstants#NULL_LONG}; otherwise the input microseconds from the + * Epoch converted to a {@link LocalDateTime} in UTC timezone + */ + public static @Nullable LocalDateTime epochMicrosToLocalDateTimeUTC(final long micros) { + return micros == NULL_LONG ? null + : LocalDateTime.ofEpochSecond(micros / 1_000_000L, (int) ((micros % 1_000_000L) * MICRO), + ZoneOffset.UTC); + } + + /** + * Converts milliseconds from the Epoch to a {@link LocalDateTime} in UTC timezone. + * + * @param millis milliseconds since Epoch + * @return {@code null} if the input is {@link QueryConstants#NULL_LONG}; otherwise the input milliseconds from the + * Epoch converted to a {@link LocalDateTime} in UTC timezone + */ + public static @Nullable LocalDateTime epochMillisToLocalDateTimeUTC(final long millis) { + return millis == NULL_LONG ? null + : LocalDateTime.ofEpochSecond(millis / 1_000L, (int) ((millis % 1_000L) * MILLI), ZoneOffset.UTC); + } + // endregion // region Arithmetic diff --git a/engine/time/src/test/java/io/deephaven/time/TestDateTimeUtils.java b/engine/time/src/test/java/io/deephaven/time/TestDateTimeUtils.java index c8fbfc2d78d..4e3e6b3d501 100644 --- a/engine/time/src/test/java/io/deephaven/time/TestDateTimeUtils.java +++ b/engine/time/src/test/java/io/deephaven/time/TestDateTimeUtils.java @@ -1391,6 +1391,10 @@ public void testEpochNanos() { TestCase.assertEquals(nanos, DateTimeUtils.epochNanos(dt3)); TestCase.assertEquals(NULL_LONG, DateTimeUtils.epochNanos((ZonedDateTime) null)); + + final LocalDateTime ldt = LocalDateTime.ofInstant(dt2, ZoneId.of("UTC")); + TestCase.assertEquals(nanos, DateTimeUtils.epochNanosUTC(ldt)); + TestCase.assertEquals(NULL_LONG, DateTimeUtils.epochNanosUTC(null)); } public void testEpochMicros() { @@ -1456,6 +1460,10 @@ public void testEpochNanosTo() { TestCase.assertEquals(dt3, DateTimeUtils.epochNanosToZonedDateTime(nanos, TZ_JP)); TestCase.assertNull(DateTimeUtils.epochNanosToZonedDateTime(NULL_LONG, TZ_JP)); TestCase.assertNull(DateTimeUtils.epochNanosToZonedDateTime(nanos, null)); + + final LocalDateTime ldt = LocalDateTime.ofInstant(dt2, ZoneId.of("UTC")); + TestCase.assertEquals(ldt, DateTimeUtils.epochNanosToLocalDateTimeUTC(nanos)); + TestCase.assertNull(DateTimeUtils.epochNanosToLocalDateTimeUTC(NULL_LONG)); } public void testEpochMicrosTo() { @@ -1471,6 +1479,10 @@ public void testEpochMicrosTo() { TestCase.assertEquals(dt3, DateTimeUtils.epochMicrosToZonedDateTime(micros, TZ_JP)); TestCase.assertNull(DateTimeUtils.epochMicrosToZonedDateTime(NULL_LONG, TZ_JP)); TestCase.assertNull(DateTimeUtils.epochMicrosToZonedDateTime(micros, null)); + + final LocalDateTime ldt = LocalDateTime.ofInstant(dt2, ZoneId.of("UTC")); + TestCase.assertEquals(ldt, DateTimeUtils.epochMicrosToLocalDateTimeUTC(micros)); + TestCase.assertNull(DateTimeUtils.epochMicrosToLocalDateTimeUTC(NULL_LONG)); } public void testEpochMillisTo() { @@ -1486,6 +1498,10 @@ public void testEpochMillisTo() { TestCase.assertEquals(dt3, DateTimeUtils.epochMillisToZonedDateTime(millis, TZ_JP)); TestCase.assertNull(DateTimeUtils.epochMillisToZonedDateTime(NULL_LONG, TZ_JP)); TestCase.assertNull(DateTimeUtils.epochMillisToZonedDateTime(millis, null)); + + final LocalDateTime ldt = LocalDateTime.ofInstant(dt2, ZoneId.of("UTC")); + TestCase.assertEquals(ldt, DateTimeUtils.epochMillisToLocalDateTimeUTC(millis)); + TestCase.assertNull(DateTimeUtils.epochMillisToLocalDateTimeUTC(NULL_LONG)); } public void testEpochSecondsTo() { diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java index 3621d69b5a0..1db38879652 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java @@ -236,23 +236,13 @@ private static void buildChildren(Types.GroupBuilder builder, Iterator> visit(final LogicalTypeAnnotation.TimeLogicalTypeAnnot @Override public Optional> visit( final LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) { - // TODO(deephaven-core#976): Unable to read parquet TimestampLogicalTypeAnnotation that is not adjusted - // to UTC - if (timestampLogicalType.isAdjustedToUTC()) { - switch (timestampLogicalType.getUnit()) { - case MILLIS: - case MICROS: - case NANOS: - return Optional.of(Instant.class); - } + switch (timestampLogicalType.getUnit()) { + case MILLIS: + case MICROS: + case NANOS: + // TIMESTAMP fields if adjusted to UTC are read as Instants, else as LocalDatetimes. + return timestampLogicalType.isAdjustedToUTC() ? Optional.of(Instant.class) + : Optional.of(LocalDateTime.class); } errorString.setValue("TimestampLogicalType, isAdjustedToUTC=" + timestampLogicalType.isAdjustedToUTC() + ", unit=" + timestampLogicalType.getUnit()); diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/TypeInfos.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/TypeInfos.java index 37a78a7a360..48d43f44dc4 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/TypeInfos.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/TypeInfos.java @@ -26,6 +26,7 @@ import java.time.Instant; import java.time.LocalDate; import java.time.LocalTime; +import java.time.LocalDateTime; import java.util.*; import java.util.function.Supplier; @@ -51,6 +52,7 @@ public class TypeInfos { BigIntegerType.INSTANCE, LocalDateType.INSTANCE, LocalTimeType.INSTANCE, + LocalDateTimeType.INSTANCE, }; private static final Map, TypeInfo> BY_CLASS; @@ -150,10 +152,7 @@ public Set> getTypes() { } @Override - public PrimitiveBuilder getBuilder(boolean required, boolean repeating, Class dataType) { - if (!isValidFor(dataType)) { - throw new IllegalArgumentException("Invalid data type " + dataType); - } + public PrimitiveBuilder getBuilderImpl(boolean required, boolean repeating, Class dataType) { return type(PrimitiveTypeName.BINARY, required, repeating) .as(LogicalTypeAnnotation.decimalType(precisionAndScale.scale, precisionAndScale.precision)); } @@ -193,10 +192,7 @@ public Set> getTypes() { } @Override - public PrimitiveBuilder getBuilder(boolean required, boolean repeating, Class dataType) { - if (!isValidFor(dataType)) { - throw new IllegalArgumentException("Invalid data type " + dataType); - } + public PrimitiveBuilder getBuilderImpl(boolean required, boolean repeating, Class dataType) { return type(PrimitiveTypeName.INT32, required, repeating).as(LogicalTypeAnnotation.intType(32, true)); } } @@ -213,10 +209,7 @@ public Set> getTypes() { } @Override - public PrimitiveBuilder getBuilder(boolean required, boolean repeating, Class dataType) { - if (!isValidFor(dataType)) { - throw new IllegalArgumentException("Invalid data type " + dataType); - } + public PrimitiveBuilder getBuilderImpl(boolean required, boolean repeating, Class dataType) { return type(PrimitiveTypeName.INT64, required, repeating); } } @@ -233,10 +226,7 @@ public Set> getTypes() { } @Override - public PrimitiveBuilder getBuilder(boolean required, boolean repeating, Class dataType) { - if (!isValidFor(dataType)) { - throw new IllegalArgumentException("Invalid data type " + dataType); - } + public PrimitiveBuilder getBuilderImpl(boolean required, boolean repeating, Class dataType) { return type(PrimitiveTypeName.INT32, required, repeating).as(LogicalTypeAnnotation.intType(16, true)); } } @@ -253,10 +243,7 @@ public Set> getTypes() { } @Override - public PrimitiveBuilder getBuilder(boolean required, boolean repeating, Class dataType) { - if (!isValidFor(dataType)) { - throw new IllegalArgumentException("Invalid data type " + dataType); - } + public PrimitiveBuilder getBuilderImpl(boolean required, boolean repeating, Class dataType) { return type(PrimitiveTypeName.BOOLEAN, required, repeating); } } @@ -273,10 +260,7 @@ public Set> getTypes() { } @Override - public PrimitiveBuilder getBuilder(boolean required, boolean repeating, Class dataType) { - if (!isValidFor(dataType)) { - throw new IllegalArgumentException("Invalid data type " + dataType); - } + public PrimitiveBuilder getBuilderImpl(boolean required, boolean repeating, Class dataType) { return type(PrimitiveTypeName.FLOAT, required, repeating); } } @@ -293,10 +277,7 @@ public Set> getTypes() { } @Override - public PrimitiveBuilder getBuilder(boolean required, boolean repeating, Class dataType) { - if (!isValidFor(dataType)) { - throw new IllegalArgumentException("Invalid data type " + dataType); - } + public PrimitiveBuilder getBuilderImpl(boolean required, boolean repeating, Class dataType) { return type(PrimitiveTypeName.DOUBLE, required, repeating); } } @@ -313,10 +294,7 @@ public Set> getTypes() { } @Override - public PrimitiveBuilder getBuilder(boolean required, boolean repeating, Class dataType) { - if (!isValidFor(dataType)) { - throw new IllegalArgumentException("Invalid data type " + dataType); - } + public PrimitiveBuilder getBuilderImpl(boolean required, boolean repeating, Class dataType) { return type(PrimitiveTypeName.INT32, required, repeating).as(LogicalTypeAnnotation.intType(16, false)); } } @@ -333,10 +311,7 @@ public Set> getTypes() { } @Override - public PrimitiveBuilder getBuilder(boolean required, boolean repeating, Class dataType) { - if (!isValidFor(dataType)) { - throw new IllegalArgumentException("Invalid data type " + dataType); - } + public PrimitiveBuilder getBuilderImpl(boolean required, boolean repeating, Class dataType) { return type(PrimitiveTypeName.INT32, required, repeating).as(LogicalTypeAnnotation.intType(8, true)); } } @@ -352,10 +327,7 @@ public Set> getTypes() { } @Override - public PrimitiveBuilder getBuilder(boolean required, boolean repeating, Class dataType) { - if (!isValidFor(dataType)) { - throw new IllegalArgumentException("Invalid data type " + dataType); - } + public PrimitiveBuilder getBuilderImpl(boolean required, boolean repeating, Class dataType) { return type(PrimitiveTypeName.BINARY, required, repeating) .as(LogicalTypeAnnotation.stringType()); } @@ -372,15 +344,31 @@ public Set> getTypes() { } @Override - public PrimitiveBuilder getBuilder(boolean required, boolean repeating, Class dataType) { - if (!isValidFor(dataType)) { - throw new IllegalArgumentException("Invalid data type " + dataType); - } + public PrimitiveBuilder getBuilderImpl(boolean required, boolean repeating, Class dataType) { + // Write instants as Parquet TIMESTAMP(isAdjustedToUTC = true, unit = NANOS) return type(PrimitiveTypeName.INT64, required, repeating) .as(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.NANOS)); } } + private enum LocalDateTimeType implements TypeInfo { + INSTANCE; + + private static final Set> clazzes = Collections.singleton(LocalDateTime.class); + + @Override + public Set> getTypes() { + return clazzes; + } + + @Override + public PrimitiveBuilder getBuilderImpl(boolean required, boolean repeating, Class dataType) { + // Write LocalDateTime as Parquet TIMESTAMP(isAdjustedToUTC = false, unit = NANOS) + return type(PrimitiveTypeName.INT64, required, repeating) + .as(LogicalTypeAnnotation.timestampType(false, LogicalTypeAnnotation.TimeUnit.NANOS)); + } + } + private enum LocalDateType implements TypeInfo { INSTANCE; @@ -392,10 +380,7 @@ public Set> getTypes() { } @Override - public PrimitiveBuilder getBuilder(boolean required, boolean repeating, Class dataType) { - if (!isValidFor(dataType)) { - throw new IllegalArgumentException("Invalid data type " + dataType); - } + public PrimitiveBuilder getBuilderImpl(boolean required, boolean repeating, Class dataType) { return type(PrimitiveTypeName.INT32, required, repeating) .as(LogicalTypeAnnotation.dateType()); } @@ -412,10 +397,7 @@ public Set> getTypes() { } @Override - public PrimitiveBuilder getBuilder(boolean required, boolean repeating, Class dataType) { - if (!isValidFor(dataType)) { - throw new IllegalArgumentException("Invalid data type " + dataType); - } + public PrimitiveBuilder getBuilderImpl(boolean required, boolean repeating, Class dataType) { // Always write in (isAdjustedToUTC = true, unit = NANOS) format return type(PrimitiveTypeName.INT64, required, repeating) .as(LogicalTypeAnnotation.timeType(true, LogicalTypeAnnotation.TimeUnit.NANOS)); @@ -439,7 +421,7 @@ public Set> getTypes() { } @Override - public PrimitiveBuilder getBuilder(boolean required, boolean repeating, Class dataType) { + public PrimitiveBuilder getBuilderImpl(boolean required, boolean repeating, Class dataType) { return type(PrimitiveTypeName.BINARY, required, repeating) .as(LogicalTypeAnnotation.decimalType(0, 1)); } @@ -454,6 +436,17 @@ default boolean isValidFor(Class clazz) { return getTypes().contains(clazz); } + default PrimitiveBuilder getBuilderImpl(boolean required, boolean repeating, Class dataType) { + throw new UnsupportedOperationException("Implement this method if using the default getBuilder()"); + } + + default PrimitiveBuilder getBuilder(boolean required, boolean repeating, Class dataType) { + if (!isValidFor(dataType)) { + throw new IllegalArgumentException("Invalid data type " + dataType); + } + return getBuilderImpl(required, repeating, dataType); + } + default Type createSchemaType( @NotNull final ColumnDefinition columnDefinition, @NotNull final ParquetInstructions instructions) { @@ -486,8 +479,6 @@ default Type createSchemaType( builder.named("item")).named(parquetColumnName)) .as(LogicalTypeAnnotation.listType()).named(parquetColumnName); } - - PrimitiveBuilder getBuilder(boolean required, boolean repeating, Class dataType); } private static class CodecType implements TypeInfo { diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java index 1adf7044ce7..40a4aaf6fdd 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java @@ -723,13 +723,10 @@ private static class LogicalTypeVisitor @Override public Optional> visit( final LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) { - // TODO(deephaven-core#976): Unable to read parquet TimestampLogicalTypeAnnotation that is not adjusted - // to UTC if (timestampLogicalType.isAdjustedToUTC()) { - return Optional - .of(ToInstantPage.create(componentType, timestampLogicalType.getUnit())); + return Optional.of(ToInstantPage.create(componentType, timestampLogicalType.getUnit())); } - return Optional.empty(); + return Optional.of(ToLocalDateTimePage.create(componentType, timestampLogicalType.getUnit())); } @Override diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/topage/ToLocalDateTimePage.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/topage/ToLocalDateTimePage.java new file mode 100644 index 00000000000..8f906915d56 --- /dev/null +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/topage/ToLocalDateTimePage.java @@ -0,0 +1,100 @@ +/** + * Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending + */ +package io.deephaven.parquet.table.pagestore.topage; + +import io.deephaven.chunk.ChunkType; +import io.deephaven.chunk.attributes.Any; +import io.deephaven.time.DateTimeUtils; +import io.deephaven.util.QueryConstants; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.jetbrains.annotations.NotNull; + +import java.time.LocalDateTime; +import java.util.function.LongFunction; + +/** + * Used to convert Parquet TIMESTAMP values with {@code isAdjustedToUTC=false} to {@link LocalDateTime}. Ref: ... + */ +public class ToLocalDateTimePage implements ToPage { + + @SuppressWarnings("rawtypes") + private static final ToPage MILLIS_INSTANCE = new ToLocalDateTimePageFromMillis(); + @SuppressWarnings("rawtypes") + private static final ToPage MICROS_INSTANCE = new ToLocalDateTimePageFromMicros(); + @SuppressWarnings("rawtypes") + private static final ToPage NANOS_INSTANCE = new ToLocalDateTimePageFromNanos(); + + @SuppressWarnings("unchecked") + public static ToPage create(@NotNull final Class nativeType, + @NotNull final LogicalTypeAnnotation.TimeUnit unit) { + if (LocalDateTime.class.equals(nativeType)) { + switch (unit) { + case MILLIS: + return MILLIS_INSTANCE; + case MICROS: + return MICROS_INSTANCE; + case NANOS: + return NANOS_INSTANCE; + default: + throw new IllegalArgumentException("Unsupported unit=" + unit); + } + } + throw new IllegalArgumentException( + "The native type for a LocalDateTime column is " + nativeType.getCanonicalName()); + } + + ToLocalDateTimePage() {} + + @Override + @NotNull + public final Class getNativeType() { + return LocalDateTime.class; + } + + @Override + @NotNull + public final ChunkType getChunkType() { + return ChunkType.Object; + } + + @Override + @NotNull + public final Object nullValue() { + return QueryConstants.NULL_LONG_BOXED; + } + + private static LocalDateTime[] convertResultHelper(@NotNull final Object result, + @NotNull final LongFunction unitToLocalDateTime) { + final long[] from = (long[]) result; + final LocalDateTime[] to = new LocalDateTime[from.length]; + + for (int i = 0; i < from.length; ++i) { + to[i] = unitToLocalDateTime.apply(from[i]); + } + return to; + } + + private static final class ToLocalDateTimePageFromMillis extends ToLocalDateTimePage { + @Override + public LocalDateTime[] convertResult(@NotNull final Object result) { + return convertResultHelper(result, DateTimeUtils::epochMillisToLocalDateTimeUTC); + } + } + + private static final class ToLocalDateTimePageFromMicros extends ToLocalDateTimePage { + @Override + public LocalDateTime[] convertResult(@NotNull final Object result) { + return convertResultHelper(result, DateTimeUtils::epochMicrosToLocalDateTimeUTC); + } + } + + private static final class ToLocalDateTimePageFromNanos extends ToLocalDateTimePage { + @Override + public LocalDateTime[] convertResult(@NotNull final Object result) { + return convertResultHelper(result, DateTimeUtils::epochNanosToLocalDateTimeUTC); + } + } + +} diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/topage/ToTimePage.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/topage/ToTimePage.java index 604f09a6794..fd64896b4c8 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/topage/ToTimePage.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/topage/ToTimePage.java @@ -11,6 +11,7 @@ import org.jetbrains.annotations.NotNull; import java.time.LocalTime; +import java.util.function.LongFunction; public class ToTimePage implements ToPage { @@ -82,20 +83,13 @@ public final Object nullValue() { return QueryConstants.NULL_LONG_BOXED; } - /** - * Convert a {@code long} value in the units of this page (can be micros or nanos) to a {@link LocalTime} - */ - interface ToLocalTimeFromUnits { - LocalTime apply(final long value); - } - static LocalTime[] convertResultHelper(@NotNull final Object result, - final ToLocalTimeFromUnits toLocalTimeFromUnits) { + @NotNull final LongFunction unitToLocalTime) { final long[] from = (long[]) result; final LocalTime[] to = new LocalTime[from.length]; for (int i = 0; i < from.length; ++i) { - to[i] = toLocalTimeFromUnits.apply(from[i]); + to[i] = unitToLocalTime.apply(from[i]); } return to; } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/DateTransfer.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/DateTransfer.java index ad0fb4b8e55..ea124380b33 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/DateTransfer.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/DateTransfer.java @@ -3,7 +3,7 @@ */ package io.deephaven.parquet.table.transfer; -import io.deephaven.chunk.WritableObjectChunk; +import io.deephaven.chunk.ObjectChunk; import io.deephaven.chunk.attributes.Values; import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.table.ColumnSource; @@ -12,7 +12,7 @@ import java.time.LocalDate; -final class DateTransfer extends IntCastablePrimitiveTransfer> { +final class DateTransfer extends IntCastablePrimitiveTransfer> { DateTransfer(@NotNull final ColumnSource columnSource, @NotNull final RowSet tableRowSet, final int targetSize) { super(columnSource, tableRowSet, targetSize); } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/LocalDateTimeArrayTransfer.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/LocalDateTimeArrayTransfer.java new file mode 100644 index 00000000000..16ed5fc6d0f --- /dev/null +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/LocalDateTimeArrayTransfer.java @@ -0,0 +1,44 @@ +/** + * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending + */ +/* + * --------------------------------------------------------------------------------------------------------------------- + * AUTO-GENERATED CLASS - DO NOT EDIT MANUALLY - for any changes edit InstantArrayTransfer and regenerate + * --------------------------------------------------------------------------------------------------------------------- + */ +package io.deephaven.parquet.table.transfer; + +import io.deephaven.engine.rowset.RowSequence; +import io.deephaven.engine.table.ColumnSource; +import io.deephaven.time.DateTimeUtils; +import org.jetbrains.annotations.NotNull; + +import java.nio.LongBuffer; +import java.time.LocalDateTime; + +final class LocalDateTimeArrayTransfer + extends PrimitiveArrayAndVectorTransfer { + // We encode LocalDateTime as primitive longs + LocalDateTimeArrayTransfer(@NotNull final ColumnSource columnSource, @NotNull final RowSequence tableRowSet, + final int targetPageSizeInBytes) { + super(columnSource, tableRowSet, targetPageSizeInBytes / Long.BYTES, targetPageSizeInBytes, + LongBuffer.allocate(targetPageSizeInBytes / Long.BYTES), Long.BYTES); + } + + @Override + int getSize(final LocalDateTime @NotNull [] data) { + return data.length; + } + + @Override + void resizeBuffer(final int length) { + buffer = LongBuffer.allocate(length); + } + + @Override + void copyToBuffer(@NotNull final EncodedData data) { + for (final LocalDateTime t : data.encodedValues) { + buffer.put(DateTimeUtils.epochNanosUTC(t)); + } + } +} diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/LocalDateTimeTransfer.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/LocalDateTimeTransfer.java new file mode 100644 index 00000000000..38ca4a338ea --- /dev/null +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/LocalDateTimeTransfer.java @@ -0,0 +1,37 @@ +/** + * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending + */ +/* + * --------------------------------------------------------------------------------------------------------------------- + * AUTO-GENERATED CLASS - DO NOT EDIT MANUALLY - for any changes edit TimeTransfer and regenerate + * --------------------------------------------------------------------------------------------------------------------- + */ +package io.deephaven.parquet.table.transfer; + +import io.deephaven.chunk.ObjectChunk; +import io.deephaven.chunk.attributes.Values; +import io.deephaven.engine.rowset.RowSequence; +import io.deephaven.engine.table.ColumnSource; +import io.deephaven.time.DateTimeUtils; +import org.jetbrains.annotations.NotNull; + +import java.nio.LongBuffer; +import java.time.LocalDateTime; + +final class LocalDateTimeTransfer extends GettingPrimitiveTransfer, LongBuffer> { + + LocalDateTimeTransfer(@NotNull final ColumnSource columnSource, @NotNull final RowSequence tableRowSet, + final int targetPageSizeInBytes) { + super(columnSource, tableRowSet, + LongBuffer.allocate(Math.toIntExact(Math.min(tableRowSet.size(), targetPageSizeInBytes / Long.BYTES))), + Math.toIntExact(Math.min(tableRowSet.size(), targetPageSizeInBytes / Long.BYTES))); + } + + @Override + void copyAllFromChunkToBuffer() { + final int chunkSize = chunk.size(); + for (int chunkIdx = 0; chunkIdx < chunkSize; ++chunkIdx) { + buffer.put(DateTimeUtils.epochNanosUTC(chunk.get(chunkIdx))); + } + } +} diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/LocalDateTimeVectorTransfer.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/LocalDateTimeVectorTransfer.java new file mode 100644 index 00000000000..0fdb16f59c3 --- /dev/null +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/LocalDateTimeVectorTransfer.java @@ -0,0 +1,40 @@ +/** + * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending + */ +/* + * --------------------------------------------------------------------------------------------------------------------- + * AUTO-GENERATED CLASS - DO NOT EDIT MANUALLY - for any changes edit InstantVectorTransfer and regenerate + * --------------------------------------------------------------------------------------------------------------------- + */ +package io.deephaven.parquet.table.transfer; + +import io.deephaven.engine.primitive.iterator.CloseableIterator; +import io.deephaven.engine.rowset.RowSequence; +import io.deephaven.engine.table.ColumnSource; +import io.deephaven.time.DateTimeUtils; +import io.deephaven.vector.ObjectVector; +import org.jetbrains.annotations.NotNull; + +import java.nio.LongBuffer; +import java.time.LocalDateTime; + +final class LocalDateTimeVectorTransfer extends PrimitiveVectorTransfer, LongBuffer> { + // We encode LocalDateTime as primitive longs + LocalDateTimeVectorTransfer(@NotNull final ColumnSource columnSource, @NotNull final RowSequence tableRowSet, + final int targetPageSizeInBytes) { + super(columnSource, tableRowSet, targetPageSizeInBytes / Long.BYTES, targetPageSizeInBytes, + LongBuffer.allocate(targetPageSizeInBytes / Long.BYTES), Long.BYTES); + } + + @Override + void resizeBuffer(final int length) { + buffer = LongBuffer.allocate(length); + } + + @Override + void copyToBuffer(@NotNull final EncodedData> data) { + try (final CloseableIterator dataIterator = data.encodedValues.iterator()) { + dataIterator.forEachRemaining((LocalDateTime t) -> buffer.put(DateTimeUtils.epochNanosUTC(t))); + } + } +} diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/TimeTransfer.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/TimeTransfer.java index 9021af1511f..48c396a99a4 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/TimeTransfer.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/TimeTransfer.java @@ -1,6 +1,9 @@ +/** + * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending + */ package io.deephaven.parquet.table.transfer; -import io.deephaven.chunk.WritableObjectChunk; +import io.deephaven.chunk.ObjectChunk; import io.deephaven.chunk.attributes.Values; import io.deephaven.engine.rowset.RowSequence; import io.deephaven.engine.table.ColumnSource; @@ -10,7 +13,7 @@ import java.nio.LongBuffer; import java.time.LocalTime; -final class TimeTransfer extends GettingPrimitiveTransfer, LongBuffer> { +final class TimeTransfer extends GettingPrimitiveTransfer, LongBuffer> { TimeTransfer(@NotNull final ColumnSource columnSource, @NotNull final RowSequence tableRowSet, final int targetPageSizeInBytes) { @@ -23,7 +26,6 @@ final class TimeTransfer extends GettingPrimitiveTransfer TransferObject create( if (columnType == LocalTime.class) { return new TimeTransfer(columnSource, tableRowSet, instructions.getTargetPageSize()); } + if (columnType == LocalDateTime.class) { + return new LocalDateTimeTransfer(columnSource, tableRowSet, instructions.getTargetPageSize()); + } @Nullable final Class componentType = columnSource.getComponentType(); if (columnType.isArray()) { @@ -140,6 +144,9 @@ static TransferObject create( if (componentType == LocalTime.class) { return new TimeArrayTransfer(columnSource, tableRowSet, instructions.getTargetPageSize()); } + if (componentType == LocalDateTime.class) { + return new LocalDateTimeArrayTransfer(columnSource, tableRowSet, instructions.getTargetPageSize()); + } // TODO(deephaven-core#4612): Handle arrays of BigDecimal and if explicit codec provided } if (Vector.class.isAssignableFrom(columnType)) { @@ -183,6 +190,9 @@ static TransferObject create( if (componentType == LocalTime.class) { return new TimeVectorTransfer(columnSource, tableRowSet, instructions.getTargetPageSize()); } + if (componentType == LocalDateTime.class) { + return new LocalDateTimeVectorTransfer(columnSource, tableRowSet, instructions.getTargetPageSize()); + } // TODO(deephaven-core#4612): Handle vectors of BigDecimal and if explicit codec provided } diff --git a/extensions/parquet/table/src/test/e0.py b/extensions/parquet/table/src/test/e0.py index f6cc32c323d..09416337baa 100644 --- a/extensions/parquet/table/src/test/e0.py +++ b/extensions/parquet/table/src/test/e0.py @@ -8,8 +8,7 @@ "c": np.arange(3, 6).astype("u1"), "d": np.arange(4.0, 7.0, dtype="float64"), "e": [True, False, True], - # TODO(deephaven-core#976): Unable to read parquet TimestampLogicalTypeAnnotation that is not adjusted to UTC - # "f": pd.date_range("20130101", periods=3), + "f": pd.date_range("20130101", periods=3), "g": pd.date_range("20130101", periods=3, tz="US/Eastern"), "h": pd.Categorical(list("abc")), "i": pd.Categorical(list("abc"), ordered=True), diff --git a/extensions/parquet/table/src/test/e1.py b/extensions/parquet/table/src/test/e1.py index 450179b49e1..408c327f3a8 100644 --- a/extensions/parquet/table/src/test/e1.py +++ b/extensions/parquet/table/src/test/e1.py @@ -8,8 +8,7 @@ "c": np.arange(3, 6).astype("u1"), "d": np.arange(4.0, 7.0, dtype="float64"), "e": [True, False, True], - # TODO(deephaven-core#976): Unable to read parquet TimestampLogicalTypeAnnotation that is not adjusted to UTC - # "f": pd.date_range("20130101", periods=3), + "f": pd.date_range("20130101", periods=3), "g": pd.date_range("20130101", periods=3, tz="US/Eastern"), "h": pd.Categorical(list("abc")), "i": pd.Categorical(list("abc"), ordered=True), diff --git a/extensions/parquet/table/src/test/e2.py b/extensions/parquet/table/src/test/e2.py index 9fa3560a1e0..446fb28519a 100644 --- a/extensions/parquet/table/src/test/e2.py +++ b/extensions/parquet/table/src/test/e2.py @@ -8,8 +8,7 @@ "c": np.arange(3, 6).astype("u1"), "d": np.arange(4.0, 7.0, dtype="float64"), "e": [True, False, True], - # TODO(deephaven-core#976): Unable to read parquet TimestampLogicalTypeAnnotation that is not adjusted to UTC - # "f": pd.date_range("20130101", periods=3), + "f": pd.date_range("20130101", periods=3), "g": pd.date_range("20130101", periods=3, tz="US/Eastern"), "h": pd.Categorical(list("abc")), "i": pd.Categorical(list("abc"), ordered=True), diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java index c578137e336..026617ed081 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java @@ -62,7 +62,6 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.time.Instant; -import java.time.LocalTime; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -129,6 +128,7 @@ private static Table getTableFlat(int size, boolean includeSerializable, boolean "someBiColumn = java.math.BigInteger.valueOf(ii)", "someDateColumn = i % 10 == 0 ? null : java.time.LocalDate.ofEpochDay(i)", "someTimeColumn = i % 10 == 0 ? null : java.time.LocalTime.of(i%24, i%60, (i+10)%60)", + "someDateTimeColumn = i % 10 == 0 ? null : java.time.LocalDateTime.of(2000+i%10, i%12+1, i%30+1, (i+4)%24, (i+5)%60, (i+6)%60, i)", "nullKey = i < -1?`123`:null", "nullIntColumn = (int)null", "nullLongColumn = (long)null", @@ -507,9 +507,10 @@ public void testArrayColumns() { "someByteArrayColumn = new byte[] {i % 10 == 0 ? null : (byte)i}", "someCharArrayColumn = new char[] {i % 10 == 0 ? null : (char)i}", "someTimeArrayColumn = new Instant[] {i % 10 == 0 ? null : (Instant)DateTimeUtils.now() + i}", - "someBiColumn = new java.math.BigInteger[] {i % 10 == 0 ? null : java.math.BigInteger.valueOf(i)}", - "someDateColumn = new java.time.LocalDate[] {i % 10 == 0 ? null : java.time.LocalDate.ofEpochDay(i)}", - "someTimeColumn = new java.time.LocalTime[] {i % 10 == 0 ? null : java.time.LocalTime.of(i%24, i%60, (i+10)%60)}", + "someBiArrayColumn = new java.math.BigInteger[] {i % 10 == 0 ? null : java.math.BigInteger.valueOf(i)}", + "someDateArrayColumn = new java.time.LocalDate[] {i % 10 == 0 ? null : java.time.LocalDate.ofEpochDay(i)}", + "someTimeArrayColumn = new java.time.LocalTime[] {i % 10 == 0 ? null : java.time.LocalTime.of(i%24, i%60, (i+10)%60)}", + "someDateTimeArrayColumn = new java.time.LocalDateTime[] {i % 10 == 0 ? null : java.time.LocalDateTime.of(2000+i%10, i%12+1, i%30+1, (i+4)%24, (i+5)%60, (i+6)%60, i)}", "nullStringArrayColumn = new String[] {(String)null}", "nullIntArrayColumn = new int[] {(int)null}", "nullLongArrayColumn = new long[] {(long)null}", @@ -1271,8 +1272,10 @@ public void readWriteStatisticsTest() { public void readWriteDateTimeTest() { final int NUM_ROWS = 1000; final Table table = TableTools.emptyTable(NUM_ROWS).view( - "someDateColumn = i % 10 == 0 ? null : java.time.LocalDate.ofEpochDay(i)", - "someTimeColumn = i % 10 == 0 ? null : java.time.LocalTime.of(i%24, i%60, (i+10)%60)"); + "someDateColumn = java.time.LocalDate.ofEpochDay(i)", + "someTimeColumn = java.time.LocalTime.of(i%24, i%60, (i+10)%60)", + "someLocalDateTimeColumn = java.time.LocalDateTime.of(2000+i%10, i%12+1, i%30+1, (i+4)%24, (i+5)%60, (i+6)%60, i)", + "someInstantColumn = DateTimeUtils.now() + i").select(); final File dest = new File(rootFile, "readWriteDateTimeTest.parquet"); writeReadTableTest(table, dest); @@ -1286,9 +1289,22 @@ public void readWriteDateTimeTest() { final ColumnChunkMetaData timeColMetadata = metadata.getBlocks().get(0).getColumns().get(1); assertTrue(timeColMetadata.toString().contains("someTimeColumn")); assertEquals(PrimitiveType.PrimitiveTypeName.INT64, timeColMetadata.getPrimitiveType().getPrimitiveTypeName()); - final boolean isAdjustedToUTC = true; - assertEquals(LogicalTypeAnnotation.timeType(isAdjustedToUTC, LogicalTypeAnnotation.TimeUnit.NANOS), + assertEquals(LogicalTypeAnnotation.timeType(true, LogicalTypeAnnotation.TimeUnit.NANOS), timeColMetadata.getPrimitiveType().getLogicalTypeAnnotation()); + + final ColumnChunkMetaData localDateTimeColMetadata = metadata.getBlocks().get(0).getColumns().get(2); + assertTrue(localDateTimeColMetadata.toString().contains("someLocalDateTimeColumn")); + assertEquals(PrimitiveType.PrimitiveTypeName.INT64, + localDateTimeColMetadata.getPrimitiveType().getPrimitiveTypeName()); + assertEquals(LogicalTypeAnnotation.timestampType(false, LogicalTypeAnnotation.TimeUnit.NANOS), + localDateTimeColMetadata.getPrimitiveType().getLogicalTypeAnnotation()); + + final ColumnChunkMetaData instantColMetadata = metadata.getBlocks().get(0).getColumns().get(3); + assertTrue(instantColMetadata.toString().contains("someInstantColumn")); + assertEquals(PrimitiveType.PrimitiveTypeName.INT64, + instantColMetadata.getPrimitiveType().getPrimitiveTypeName()); + assertEquals(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.NANOS), + instantColMetadata.getPrimitiveType().getLogicalTypeAnnotation()); } /** diff --git a/extensions/parquet/table/src/test/resources/e0/brotli.parquet b/extensions/parquet/table/src/test/resources/e0/brotli.parquet index e26a64f8c61..0fcda0a3ec7 100644 --- a/extensions/parquet/table/src/test/resources/e0/brotli.parquet +++ b/extensions/parquet/table/src/test/resources/e0/brotli.parquet @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:c7b823aa5e020d7d17cf60d9d34dcdbf87578a56eb3ba20bf8a9e540a2a6d6a9 -size 5751 +oid sha256:9ffb97b406a6b35340d8d7c7d59702b85a0eda9055fb11e3b96b83bab4172e37 +size 6399 diff --git a/extensions/parquet/table/src/test/resources/e0/gzip.parquet b/extensions/parquet/table/src/test/resources/e0/gzip.parquet index 39cd91a4ee3..0c3e6ffb24a 100644 --- a/extensions/parquet/table/src/test/resources/e0/gzip.parquet +++ b/extensions/parquet/table/src/test/resources/e0/gzip.parquet @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:91e447006fe6655210561391e0abb9eb72d5984b81b9eaee587739f450c52b8d -size 5962 +oid sha256:81ef4ceeabaf58d72201d2a6a3273f8874646f7101b7a3fc5ae6d01479ae9b3a +size 6639 diff --git a/extensions/parquet/table/src/test/resources/e0/lz4.parquet b/extensions/parquet/table/src/test/resources/e0/lz4.parquet index 3dd0120a98c..18f29cea78d 100644 --- a/extensions/parquet/table/src/test/resources/e0/lz4.parquet +++ b/extensions/parquet/table/src/test/resources/e0/lz4.parquet @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:9ccbcc31974798fa363cb7ef8d4a6630d14e7ebdc44716a857c38d963c37077c -size 5718 +oid sha256:4019e7677d826e82069439a4a78513713c95d4c7130bb6af8a86220d11a59306 +size 6361 diff --git a/extensions/parquet/table/src/test/resources/e0/snappy.parquet b/extensions/parquet/table/src/test/resources/e0/snappy.parquet index dd0ef9114a8..6b2f0743603 100644 --- a/extensions/parquet/table/src/test/resources/e0/snappy.parquet +++ b/extensions/parquet/table/src/test/resources/e0/snappy.parquet @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:d941980a47287b30d526f4d826b6e23cb7b67c2f1a69e6fc094fb763cfdcbb22 -size 5732 +oid sha256:b3d39d73e7cecc035b5574e8a84dfd1583ab3fdc9896cb1a02608728e9956392 +size 6376 diff --git a/extensions/parquet/table/src/test/resources/e0/uncompressed.parquet b/extensions/parquet/table/src/test/resources/e0/uncompressed.parquet index cc6d359aa9e..84734f0d18a 100644 --- a/extensions/parquet/table/src/test/resources/e0/uncompressed.parquet +++ b/extensions/parquet/table/src/test/resources/e0/uncompressed.parquet @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:72822289b3fadc5d19cafb317d176cd944b8aa2e43916988b20c93a327bf24c5 -size 5709 +oid sha256:0e27404e9e31261706142b18ed4968f4fd0237443002803f891c5a32f23f8998 +size 6349 diff --git a/extensions/parquet/table/src/test/resources/e0/zstd.parquet b/extensions/parquet/table/src/test/resources/e0/zstd.parquet index 20e8f23b9a2..cc65c3e146b 100644 --- a/extensions/parquet/table/src/test/resources/e0/zstd.parquet +++ b/extensions/parquet/table/src/test/resources/e0/zstd.parquet @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:86ceca58808e97b3454c689468ca987cdfe8651f264a6ee6cff7880b99a1fba5 -size 5836 +oid sha256:efd4cd7542780599d1e6ff55d1945745652e454167412482024a48b478245bec +size 6494 diff --git a/extensions/parquet/table/src/test/resources/e1/brotli.parquet b/extensions/parquet/table/src/test/resources/e1/brotli.parquet index a91acfd4788..818f4ac6d6a 100644 --- a/extensions/parquet/table/src/test/resources/e1/brotli.parquet +++ b/extensions/parquet/table/src/test/resources/e1/brotli.parquet @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:38834b6a8ca6ffb787732878bfe48c677bb78885610916e83a7186bae2191471 -size 5752 +oid sha256:a232f7c7689b3b87bb2722d939ee9cbfdec14159a31e14bda7e6889ba331ee54 +size 6400 diff --git a/extensions/parquet/table/src/test/resources/e1/gzip.parquet b/extensions/parquet/table/src/test/resources/e1/gzip.parquet index c38f1487f95..6204bf4ff27 100644 --- a/extensions/parquet/table/src/test/resources/e1/gzip.parquet +++ b/extensions/parquet/table/src/test/resources/e1/gzip.parquet @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:5742a0708f44531110116dd970c5b9e0a934bd5737d295299ee2bf6d623c435f -size 5963 +oid sha256:ac3fe36a375694571e96deb2ec6532c3f6c2370a1e2d3e2e25d1364d722a863e +size 6640 diff --git a/extensions/parquet/table/src/test/resources/e1/lz4.parquet b/extensions/parquet/table/src/test/resources/e1/lz4.parquet index 6baaaf61cd0..68ff7618be6 100644 --- a/extensions/parquet/table/src/test/resources/e1/lz4.parquet +++ b/extensions/parquet/table/src/test/resources/e1/lz4.parquet @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:1d157fa3942727b0fb0abbec859957e77c2b4371b2d5f5ed5a196eca062f21c8 -size 5719 +oid sha256:a2e43a73f87be35f31f255662daf9d6e0eb2cdd7ecf186ad9202f3bdd265af11 +size 6362 diff --git a/extensions/parquet/table/src/test/resources/e1/snappy.parquet b/extensions/parquet/table/src/test/resources/e1/snappy.parquet index e914e31503c..48aa69e58fd 100644 --- a/extensions/parquet/table/src/test/resources/e1/snappy.parquet +++ b/extensions/parquet/table/src/test/resources/e1/snappy.parquet @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:6f065dfbde45b490c53b99f7af77aea472f0b93b8c6627319ab73494ba9b6bd4 -size 5733 +oid sha256:4ee17abd8fa80d117e29aac6165bcaaa7d206c94c43d7f8bfec1fd91620244f8 +size 6377 diff --git a/extensions/parquet/table/src/test/resources/e1/uncompressed.parquet b/extensions/parquet/table/src/test/resources/e1/uncompressed.parquet index 94ee35905cd..d86cd939673 100644 --- a/extensions/parquet/table/src/test/resources/e1/uncompressed.parquet +++ b/extensions/parquet/table/src/test/resources/e1/uncompressed.parquet @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:cc55e3df0771ca17eaaf2d39f11759c203d8d96746beca3055090449bb67bf00 -size 5710 +oid sha256:c5cc2852ee31f844057ec8c5e14ce6dc94a42165ce94dc0a71ebcdb88eb39c17 +size 6350 diff --git a/extensions/parquet/table/src/test/resources/e1/zstd.parquet b/extensions/parquet/table/src/test/resources/e1/zstd.parquet index b654640e0f6..fd64a4776ff 100644 --- a/extensions/parquet/table/src/test/resources/e1/zstd.parquet +++ b/extensions/parquet/table/src/test/resources/e1/zstd.parquet @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:2d13ef75d368abe191c89dce35a1266afa38116b171c406903648e01fa079a71 -size 5841 +oid sha256:94d3ecb405d8c1e13f3f636ccdef0963cde18aaf7cbf70183d9848ede8ec6241 +size 6499 diff --git a/extensions/parquet/table/src/test/resources/e2/brotli.parquet b/extensions/parquet/table/src/test/resources/e2/brotli.parquet index 2fde3adac4d..56a7c8f5975 100644 --- a/extensions/parquet/table/src/test/resources/e2/brotli.parquet +++ b/extensions/parquet/table/src/test/resources/e2/brotli.parquet @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:94ee47283962486ad03d8c5edb195dc4bd14316258e1daf579de0bdac2b02bf3 -size 2375 +oid sha256:5d4cfc16fe3ab70ba5dff2e7cbc4e69f2245e7729240f10519c6e51f6aeab375 +size 2623 diff --git a/extensions/parquet/table/src/test/resources/e2/gzip.parquet b/extensions/parquet/table/src/test/resources/e2/gzip.parquet index 16c359740dd..8f31017a389 100644 --- a/extensions/parquet/table/src/test/resources/e2/gzip.parquet +++ b/extensions/parquet/table/src/test/resources/e2/gzip.parquet @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:725e4d9e60f33ae2da352124e68c195753c929cebe3b87c27d8814a11bfef3d4 -size 2519 +oid sha256:b20a2c66b37d8cb8d9775fb022e419dffee76e80519169e7d47b8bdee65b2fc1 +size 2775 diff --git a/extensions/parquet/table/src/test/resources/e2/lz4.parquet b/extensions/parquet/table/src/test/resources/e2/lz4.parquet index ac80cd17b63..047397beb8d 100644 --- a/extensions/parquet/table/src/test/resources/e2/lz4.parquet +++ b/extensions/parquet/table/src/test/resources/e2/lz4.parquet @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:c1b0e3853ae0479aae8dd9f2b4007dbf9106b2e5bcc88147d1174e7dec2bfcd4 -size 2419 +oid sha256:9e79e0aab241bbead0f106464c5059cdea9c9e6a21bf69be3796418e4af615fa +size 2664 diff --git a/extensions/parquet/table/src/test/resources/e2/snappy.parquet b/extensions/parquet/table/src/test/resources/e2/snappy.parquet index 2b0b2a54217..d689d97bd96 100644 --- a/extensions/parquet/table/src/test/resources/e2/snappy.parquet +++ b/extensions/parquet/table/src/test/resources/e2/snappy.parquet @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:729c42915bbe71ae43c2d1cec6e1755043b5793f5b4aaf25e21fbb24674b9406 -size 2422 +oid sha256:9524c4cabf64762de3be70e6926e978c3466f4404d48cf14654e095b96b5e170 +size 2667 diff --git a/extensions/parquet/table/src/test/resources/e2/uncompressed.parquet b/extensions/parquet/table/src/test/resources/e2/uncompressed.parquet index 1b18f5ec570..865a2c16188 100644 --- a/extensions/parquet/table/src/test/resources/e2/uncompressed.parquet +++ b/extensions/parquet/table/src/test/resources/e2/uncompressed.parquet @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:b93a2235fc98f3a05365a13c180bd51237eceef43d717c4b772911a3782cb335 -size 2419 +oid sha256:a523c7ca94006d72739cb489c4e527a593617295d492c8b824852f3e1c786c77 +size 2663 diff --git a/extensions/parquet/table/src/test/resources/e2/zstd.parquet b/extensions/parquet/table/src/test/resources/e2/zstd.parquet index aa9a74dc97f..449da5ebe7f 100644 --- a/extensions/parquet/table/src/test/resources/e2/zstd.parquet +++ b/extensions/parquet/table/src/test/resources/e2/zstd.parquet @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:326594350ab774ea7d42106c57181c7b831ee932ba56543a0ebfea6079d7d8ef -size 2476 +oid sha256:3e18d0a97ba9e39b74abcac070984db144eba3879d2d7cde43cd22471aa5feaa +size 2730 diff --git a/py/server/tests/test_parquet.py b/py/server/tests/test_parquet.py index c921a7e02a4..b2cd933740b 100644 --- a/py/server/tests/test_parquet.py +++ b/py/server/tests/test_parquet.py @@ -316,25 +316,6 @@ def test_writing_lists_via_pyarrow(self): pa_table_from_disk = dharrow.to_arrow(from_disk) self.assertTrue(pa_table.equals(pa_table_from_disk)) - def test_writing_time_via_pyarrow(self): - def _test_writing_time_helper(filename): - metadata = pyarrow.parquet.read_metadata(filename) - if "isAdjustedToUTC=false" in str(metadata.row_group(0).column(0)): - # TODO(deephaven-core#976): Unable to read non UTC adjusted timestamps - with self.assertRaises(DHError) as e: - read(filename) - self.assertIn("ParquetFileReaderException", e.exception.root_cause) - - df = pandas.DataFrame({ - "f": pandas.date_range("20130101", periods=3), - }) - df.to_parquet("pyarrow_26.parquet", engine='pyarrow', compression=None, version='2.6') - _test_writing_time_helper("pyarrow_26.parquet") - df.to_parquet("pyarrow_24.parquet", engine='pyarrow', compression=None, version='2.4') - _test_writing_time_helper("pyarrow_24.parquet") - df.to_parquet("pyarrow_10.parquet", engine='pyarrow', compression=None, version='1.0') - _test_writing_time_helper("pyarrow_10.parquet") - def test_dictionary_encoding(self): dh_table = empty_table(10).update(formulas=[ "shortStringColumn = `Row ` + i", @@ -406,7 +387,7 @@ def time_test_helper(pa_table, new_schema, dest): df_from_disk = to_pandas(from_disk) original_df = pa_table.to_pandas() # Compare the dataframes as strings - print((df_from_disk.astype(str) == original_df.astype(str)).all().values.all()) + self.assertTrue((df_from_disk.astype(str) == original_df.astype(str)).all().values.all()) # Test for nanoseconds, microseconds, and milliseconds schema_nsec = table.schema.set(0, pyarrow.field('someTimeColumn', pyarrow.time64('ns'))) @@ -418,5 +399,40 @@ def time_test_helper(pa_table, new_schema, dest): schema_msec = table.schema.set(0, pyarrow.field('someTimeColumn', pyarrow.time32('ms'))) time_test_helper(table, schema_msec, "data_from_pq_msec.parquet") + def test_non_utc_adjusted_timestamps(self): + """ Test that we can read and read timestamp columns with isAdjustedToUTC set as false and different units """ + df = pandas.DataFrame({ + "f": pandas.date_range("11:00:00", "11:00:01", freq="1ms") + }) + # Sprinkle some nulls + df["f"][0] = df["f"][5] = None + table = pyarrow.Table.from_pandas(df) + + def timestamp_test_helper(pa_table, new_schema, dest): + # Cast the table to new schema and write it using pyarrow + pa_table = pa_table.cast(new_schema) + pyarrow.parquet.write_table(pa_table, dest) + # Verify that isAdjustedToUTC set as false in the metadata + metadata = pyarrow.parquet.read_metadata(dest) + if "isAdjustedToUTC=false" not in str(metadata.row_group(0).column(0)): + self.fail("isAdjustedToUTC is not set to false") + # Read the parquet file back using deephaven and write it back + dh_table_from_disk = read(dest) + dh_dest = "dh_" + dest + write(dh_table_from_disk, dh_dest) + # Read the new parquet file using pyarrow and compare against original table + pa_table_from_disk = pyarrow.parquet.read_table(dh_dest) + self.assertTrue(pa_table == pa_table_from_disk.cast(new_schema)) + + schema_nsec = table.schema.set(0, pyarrow.field('f', pyarrow.timestamp('ns'))) + timestamp_test_helper(table, schema_nsec, 'timestamp_test_nsec.parquet') + + schema_usec = table.schema.set(0, pyarrow.field('f', pyarrow.timestamp('us'))) + timestamp_test_helper(table, schema_usec, 'timestamp_test_usec.parquet') + + schema_msec = table.schema.set(0, pyarrow.field('f', pyarrow.timestamp('ms'))) + timestamp_test_helper(table, schema_msec, 'timestamp_test_msec.parquet') + + if __name__ == '__main__': unittest.main() diff --git a/replication/static/src/main/java/io/deephaven/replicators/ReplicateParquetTransferObjects.java b/replication/static/src/main/java/io/deephaven/replicators/ReplicateParquetTransferObjects.java index 1a90d784880..eb8333ebfeb 100644 --- a/replication/static/src/main/java/io/deephaven/replicators/ReplicateParquetTransferObjects.java +++ b/replication/static/src/main/java/io/deephaven/replicators/ReplicateParquetTransferObjects.java @@ -20,9 +20,17 @@ public class ReplicateParquetTransferObjects { private static final String PARQUET_INSTANT_VECTOR_TRANSFER_PATH = PARQUET_TRANSFER_DIR + "InstantVectorTransfer.java"; + private static final String PARQUET_LOCAL_DATE_TIME_TRANSFER_PATH = + PARQUET_TRANSFER_DIR + "LocalDateTimeTransfer.java"; + private static final String PARQUET_LOCAL_DATE_TIME_ARRAY_TRANSFER_PATH = + PARQUET_TRANSFER_DIR + "LocalDateTimeArrayTransfer.java"; + private static final String PARQUET_LOCAL_DATE_TIME_VECTOR_TRANSFER_PATH = + PARQUET_TRANSFER_DIR + "LocalDateTimeVectorTransfer.java"; + private static final String PARQUET_DATE_ARRAY_TRANSFER_PATH = PARQUET_TRANSFER_DIR + "DateArrayTransfer.java"; private static final String PARQUET_DATE_VECTOR_TRANSFER_PATH = PARQUET_TRANSFER_DIR + "DateVectorTransfer.java"; + private static final String PARQUET_TIME_TRANSFER_PATH = PARQUET_TRANSFER_DIR + "TimeTransfer.java"; private static final String PARQUET_TIME_ARRAY_TRANSFER_PATH = PARQUET_TRANSFER_DIR + "TimeArrayTransfer.java"; private static final String PARQUET_TIME_VECTOR_TRANSFER_PATH = PARQUET_TRANSFER_DIR + "TimeVectorTransfer.java"; @@ -59,6 +67,25 @@ public static void main(String[] args) throws IOException { }; replaceAll(PARQUET_INSTANT_ARRAY_TRANSFER_PATH, PARQUET_TIME_ARRAY_TRANSFER_PATH, null, NO_EXCEPTIONS, pairs); replaceAll(PARQUET_INSTANT_VECTOR_TRANSFER_PATH, PARQUET_TIME_VECTOR_TRANSFER_PATH, null, NO_EXCEPTIONS, pairs); + + pairs = new String[][] { + {"InstantArrayTransfer", "LocalDateTimeArrayTransfer"}, + {"InstantVectorTransfer", "LocalDateTimeVectorTransfer"}, + {"DateTimeUtils.epochNanos", "DateTimeUtils.epochNanosUTC"}, + {"Instant", "LocalDateTime"} + }; + replaceAll(PARQUET_INSTANT_ARRAY_TRANSFER_PATH, PARQUET_LOCAL_DATE_TIME_ARRAY_TRANSFER_PATH, null, + NO_EXCEPTIONS, pairs); + replaceAll(PARQUET_INSTANT_VECTOR_TRANSFER_PATH, PARQUET_LOCAL_DATE_TIME_VECTOR_TRANSFER_PATH, null, + NO_EXCEPTIONS, pairs); + + pairs = new String[][] { + {"TimeTransfer", "LocalDateTimeTransfer"}, + {"LocalTime", "LocalDateTime"}, + {"DateTimeUtils.nanosOfDay", "DateTimeUtils.epochNanosUTC"} + }; + replaceAll(PARQUET_TIME_TRANSFER_PATH, PARQUET_LOCAL_DATE_TIME_TRANSFER_PATH, null, NO_EXCEPTIONS, pairs); + // Additional differences can be generated by Spotless } }