Skip to content

Commit

Permalink
Bug fixes for Parquet DATE and TIME, and improved support for TIMESTA…
Browse files Browse the repository at this point in the history
…MP (#4801)
  • Loading branch information
malhotrashivam authored Nov 10, 2023
1 parent 318c2e2 commit d162c89
Show file tree
Hide file tree
Showing 39 changed files with 527 additions and 183 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.LocalDateTime;

/**
* Utility class to concentrate {@link ObjectCodec} lookups.
Expand Down Expand Up @@ -76,6 +77,7 @@ private static boolean noCodecRequired(@NotNull final Class<?> dataType) {
dataType == Instant.class ||
dataType == LocalDate.class ||
dataType == LocalTime.class ||
dataType == LocalDateTime.class ||
dataType == String.class ||
// A BigDecimal column maps to a logical type of decimal, with
// appropriate precision and scale calculated from column data,
Expand Down
53 changes: 53 additions & 0 deletions engine/time/src/main/java/io/deephaven/time/DateTimeUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.time.zone.ZoneRulesException;
import java.util.Date;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -981,6 +982,21 @@ public static long epochNanos(@Nullable final ZonedDateTime dateTime) {
return safeComputeNanos(dateTime.toEpochSecond(), dateTime.getNano());
}

/**
* Returns nanoseconds from the Epoch for a {@link LocalDateTime} value in UTC timezone.
*
* @param localDateTime the local date time to compute the Epoch offset for
* @return nanoseconds since Epoch, or a NULL_LONG value if the local date time is null
*/
@ScriptApi
public static long epochNanosUTC(@Nullable final LocalDateTime localDateTime) {
if (localDateTime == null) {
return NULL_LONG;
}
return TimeUnit.SECONDS.toNanos(localDateTime.toEpochSecond(ZoneOffset.UTC))
+ localDateTime.toLocalTime().getNano();
}

/**
* Returns microseconds from the Epoch for an {@link Instant} value.
*
Expand Down Expand Up @@ -1399,6 +1415,43 @@ public static ZonedDateTime excelToZonedDateTime(final double excel, @Nullable f
return epochMillisToZonedDateTime(excelTimeToEpochMillis(excel, timeZone), timeZone);
}

/**
* Converts nanoseconds from the Epoch to a {@link LocalDateTime} in UTC timezone.
*
* @param nanos nanoseconds since Epoch
* @return {@code null} if the input is {@link QueryConstants#NULL_LONG}; otherwise the input nanoseconds from the
* Epoch converted to a {@link LocalDateTime} in UTC timezone
*/
public static @Nullable LocalDateTime epochNanosToLocalDateTimeUTC(final long nanos) {
return nanos == NULL_LONG ? null
: LocalDateTime.ofEpochSecond(nanos / 1_000_000_000L, (int) (nanos % 1_000_000_000L), ZoneOffset.UTC);
}

/**
* Converts microseconds from the Epoch to a {@link LocalDateTime} in UTC timezone.
*
* @param micros microseconds since Epoch
* @return {@code null} if the input is {@link QueryConstants#NULL_LONG}; otherwise the input microseconds from the
* Epoch converted to a {@link LocalDateTime} in UTC timezone
*/
public static @Nullable LocalDateTime epochMicrosToLocalDateTimeUTC(final long micros) {
return micros == NULL_LONG ? null
: LocalDateTime.ofEpochSecond(micros / 1_000_000L, (int) ((micros % 1_000_000L) * MICRO),
ZoneOffset.UTC);
}

/**
* Converts milliseconds from the Epoch to a {@link LocalDateTime} in UTC timezone.
*
* @param millis milliseconds since Epoch
* @return {@code null} if the input is {@link QueryConstants#NULL_LONG}; otherwise the input milliseconds from the
* Epoch converted to a {@link LocalDateTime} in UTC timezone
*/
public static @Nullable LocalDateTime epochMillisToLocalDateTimeUTC(final long millis) {
return millis == NULL_LONG ? null
: LocalDateTime.ofEpochSecond(millis / 1_000L, (int) ((millis % 1_000L) * MILLI), ZoneOffset.UTC);
}

// endregion

// region Arithmetic
Expand Down
16 changes: 16 additions & 0 deletions engine/time/src/test/java/io/deephaven/time/TestDateTimeUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -1391,6 +1391,10 @@ public void testEpochNanos() {

TestCase.assertEquals(nanos, DateTimeUtils.epochNanos(dt3));
TestCase.assertEquals(NULL_LONG, DateTimeUtils.epochNanos((ZonedDateTime) null));

final LocalDateTime ldt = LocalDateTime.ofInstant(dt2, ZoneId.of("UTC"));
TestCase.assertEquals(nanos, DateTimeUtils.epochNanosUTC(ldt));
TestCase.assertEquals(NULL_LONG, DateTimeUtils.epochNanosUTC(null));
}

public void testEpochMicros() {
Expand Down Expand Up @@ -1456,6 +1460,10 @@ public void testEpochNanosTo() {
TestCase.assertEquals(dt3, DateTimeUtils.epochNanosToZonedDateTime(nanos, TZ_JP));
TestCase.assertNull(DateTimeUtils.epochNanosToZonedDateTime(NULL_LONG, TZ_JP));
TestCase.assertNull(DateTimeUtils.epochNanosToZonedDateTime(nanos, null));

final LocalDateTime ldt = LocalDateTime.ofInstant(dt2, ZoneId.of("UTC"));
TestCase.assertEquals(ldt, DateTimeUtils.epochNanosToLocalDateTimeUTC(nanos));
TestCase.assertNull(DateTimeUtils.epochNanosToLocalDateTimeUTC(NULL_LONG));
}

public void testEpochMicrosTo() {
Expand All @@ -1471,6 +1479,10 @@ public void testEpochMicrosTo() {
TestCase.assertEquals(dt3, DateTimeUtils.epochMicrosToZonedDateTime(micros, TZ_JP));
TestCase.assertNull(DateTimeUtils.epochMicrosToZonedDateTime(NULL_LONG, TZ_JP));
TestCase.assertNull(DateTimeUtils.epochMicrosToZonedDateTime(micros, null));

final LocalDateTime ldt = LocalDateTime.ofInstant(dt2, ZoneId.of("UTC"));
TestCase.assertEquals(ldt, DateTimeUtils.epochMicrosToLocalDateTimeUTC(micros));
TestCase.assertNull(DateTimeUtils.epochMicrosToLocalDateTimeUTC(NULL_LONG));
}

public void testEpochMillisTo() {
Expand All @@ -1486,6 +1498,10 @@ public void testEpochMillisTo() {
TestCase.assertEquals(dt3, DateTimeUtils.epochMillisToZonedDateTime(millis, TZ_JP));
TestCase.assertNull(DateTimeUtils.epochMillisToZonedDateTime(NULL_LONG, TZ_JP));
TestCase.assertNull(DateTimeUtils.epochMillisToZonedDateTime(millis, null));

final LocalDateTime ldt = LocalDateTime.ofInstant(dt2, ZoneId.of("UTC"));
TestCase.assertEquals(ldt, DateTimeUtils.epochMillisToLocalDateTimeUTC(millis));
TestCase.assertNull(DateTimeUtils.epochMillisToLocalDateTimeUTC(NULL_LONG));
}

public void testEpochSecondsTo() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,23 +236,13 @@ private static void buildChildren(Types.GroupBuilder builder, Iterator<SchemaEle
}

if (schemaElement.isSetLogicalType()) {
LogicalType logicalType = schemaElement.logicalType;
if (logicalType.isSetTIMESTAMP()) {
TimestampType timestamp = logicalType.getTIMESTAMP();
if (!timestamp.isAdjustedToUTC) {
// TODO(deephaven-core#976): Unable to read non UTC adjusted timestamps
throw new ParquetFileReaderException(String.format(
"Only UTC timestamp is supported, found time column `%s` with isAdjustedToUTC=false",
schemaElement.getName()));
}
}
((Types.Builder) childBuilder).as(getLogicalTypeAnnotation(logicalType));
((Types.Builder) childBuilder).as(getLogicalTypeAnnotation(schemaElement.logicalType));
}

if (schemaElement.isSetConverted_type()) {
LogicalTypeAnnotation originalType =
getLogicalTypeAnnotation(schemaElement.converted_type, schemaElement);
LogicalTypeAnnotation newOriginalType = schemaElement.isSetLogicalType()
final LogicalTypeAnnotation originalType = getLogicalTypeAnnotation(
schemaElement.converted_type, schemaElement.logicalType, schemaElement);
final LogicalTypeAnnotation newOriginalType = schemaElement.isSetLogicalType()
&& getLogicalTypeAnnotation(schemaElement.logicalType) != null
? getLogicalTypeAnnotation(schemaElement.logicalType)
: null;
Expand Down Expand Up @@ -299,20 +289,20 @@ static LogicalTypeAnnotation getLogicalTypeAnnotation(LogicalType type) throws P
case LIST:
return LogicalTypeAnnotation.listType();
case TIME:
TimeType time = type.getTIME();
final TimeType time = type.getTIME();
return LogicalTypeAnnotation.timeType(time.isAdjustedToUTC, convertTimeUnit(time.unit));
case STRING:
return LogicalTypeAnnotation.stringType();
case DECIMAL:
DecimalType decimal = type.getDECIMAL();
final DecimalType decimal = type.getDECIMAL();
return LogicalTypeAnnotation.decimalType(decimal.scale, decimal.precision);
case INTEGER:
IntType integer = type.getINTEGER();
final IntType integer = type.getINTEGER();
return LogicalTypeAnnotation.intType(integer.bitWidth, integer.isSigned);
case UNKNOWN:
return null;
case TIMESTAMP:
TimestampType timestamp = type.getTIMESTAMP();
final TimestampType timestamp = type.getTIMESTAMP();
return LogicalTypeAnnotation.timestampType(timestamp.isAdjustedToUTC, convertTimeUnit(timestamp.unit));
default:
throw new ParquetFileReaderException("Unknown logical type " + type);
Expand Down Expand Up @@ -354,9 +344,9 @@ private static org.apache.parquet.schema.ColumnOrder fromParquetColumnOrder(Colu
return org.apache.parquet.schema.ColumnOrder.undefined();
}

private static LogicalTypeAnnotation getLogicalTypeAnnotation(ConvertedType type, SchemaElement schemaElement)
throws ParquetFileReaderException {
switch (type) {
private static LogicalTypeAnnotation getLogicalTypeAnnotation(final ConvertedType convertedType,
final LogicalType logicalType, final SchemaElement schemaElement) throws ParquetFileReaderException {
switch (convertedType) {
case UTF8:
return LogicalTypeAnnotation.stringType();
case MAP:
Expand All @@ -368,23 +358,23 @@ private static LogicalTypeAnnotation getLogicalTypeAnnotation(ConvertedType type
case ENUM:
return LogicalTypeAnnotation.enumType();
case DECIMAL:
int scale = schemaElement == null ? 0 : schemaElement.scale;
int precision = schemaElement == null ? 0 : schemaElement.precision;
final int scale = schemaElement == null ? 0 : schemaElement.scale;
final int precision = schemaElement == null ? 0 : schemaElement.precision;
return LogicalTypeAnnotation.decimalType(scale, precision);
case DATE:
return LogicalTypeAnnotation.dateType();
case TIME_MILLIS:
// TODO(deephaven-core#976) Assuming that time is adjusted to UTC
// isAdjustedToUTC parameter is ignored while reading Parquet TIME type, so disregard it here
return LogicalTypeAnnotation.timeType(true, LogicalTypeAnnotation.TimeUnit.MILLIS);
case TIME_MICROS:
// TODO(deephaven-core#976) Assuming that time is adjusted to UTC
return LogicalTypeAnnotation.timeType(true, LogicalTypeAnnotation.TimeUnit.MICROS);
case TIMESTAMP_MILLIS:
// TODO(deephaven-core#976) Assuming that time is adjusted to UTC
return LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS);
// Converted type doesn't have isAdjustedToUTC parameter, so use the information from logical type
return LogicalTypeAnnotation.timestampType(isAdjustedToUTC(logicalType),
LogicalTypeAnnotation.TimeUnit.MILLIS);
case TIMESTAMP_MICROS:
// TODO(deephaven-core#976) Assuming that time is adjusted to UTC
return LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MICROS);
return LogicalTypeAnnotation.timestampType(isAdjustedToUTC(logicalType),
LogicalTypeAnnotation.TimeUnit.MICROS);
case INTERVAL:
return LogicalTypeAnnotation.IntervalLogicalTypeAnnotation.getInstance();
case INT_8:
Expand All @@ -409,8 +399,21 @@ private static LogicalTypeAnnotation getLogicalTypeAnnotation(ConvertedType type
return LogicalTypeAnnotation.bsonType();
default:
throw new ParquetFileReaderException(
"Can't convert converted type to logical type, unknown converted type " + type);
"Can't convert converted type to logical type, unknown converted type " + convertedType);
}
}

/**
* Helper method to determine if a logical type is adjusted to UTC.
*
* @param logicalType the logical type to check
* @return true if the logical type is a timestamp adjusted to UTC, false otherwise
*/
private static boolean isAdjustedToUTC(final LogicalType logicalType) {
if (logicalType.getSetField() == LogicalType._Fields.TIMESTAMP) {
return logicalType.getTIMESTAMP().isAdjustedToUTC;
}
return false;
}

public MessageType getSchema() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.math.BigInteger;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.*;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -350,15 +351,13 @@ public Optional<Class<?>> visit(final LogicalTypeAnnotation.TimeLogicalTypeAnnot
@Override
public Optional<Class<?>> visit(
final LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) {
// TODO(deephaven-core#976): Unable to read parquet TimestampLogicalTypeAnnotation that is not adjusted
// to UTC
if (timestampLogicalType.isAdjustedToUTC()) {
switch (timestampLogicalType.getUnit()) {
case MILLIS:
case MICROS:
case NANOS:
return Optional.of(Instant.class);
}
switch (timestampLogicalType.getUnit()) {
case MILLIS:
case MICROS:
case NANOS:
// TIMESTAMP fields if adjusted to UTC are read as Instants, else as LocalDatetimes.
return timestampLogicalType.isAdjustedToUTC() ? Optional.of(Instant.class)
: Optional.of(LocalDateTime.class);
}
errorString.setValue("TimestampLogicalType, isAdjustedToUTC=" + timestampLogicalType.isAdjustedToUTC()
+ ", unit=" + timestampLogicalType.getUnit());
Expand Down
Loading

0 comments on commit d162c89

Please sign in to comment.