Skip to content

Commit

Permalink
[Improve][LocalFile] parquet use system timezone (apache#5605)
Browse files Browse the repository at this point in the history

---------

Co-authored-by: Jia Fan <fanjiaeminem@qq.com>
  • Loading branch information
liunaijie and Hisoka-X authored Oct 26, 2023
1 parent d0aff52 commit b3e1351
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -189,7 +189,10 @@ private Object resolveObject(Object data, SeaTunnelDataType<?> seaTunnelDataType
case DATE:
return data;
case TIMESTAMP:
return ((LocalDateTime) data).toInstant(ZoneOffset.of("+8")).toEpochMilli();
return ((LocalDateTime) data)
.atZone(ZoneId.systemDefault())
.toInstant()
.toEpochMilli();
case BYTES:
return ByteBuffer.wrap((byte[]) data);
case ROW:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ private Object resolveObject(Object field, SeaTunnelDataType<?> fieldType) {
return new Timestamp(timestamp).toLocalDateTime();
}
Instant instant = Instant.ofEpochMilli((long) field);
return LocalDateTime.ofInstant(instant, ZoneId.of("+8"));
return LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
case ROW:
SeaTunnelRowType rowType = (SeaTunnelRowType) fieldType;
Object[] objects = new Object[rowType.getTotalFields()];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@
import java.io.File;
import java.net.URL;
import java.nio.file.Paths;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.TimeZone;

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT;

Expand Down Expand Up @@ -70,6 +72,37 @@ public void testParquetRead2() throws Exception {
parquetReadStrategy.read(path, testCollector);
}

@Test
public void testParquetReadUseSystemDefaultTimeZone() throws Exception {
URL resource = ParquetReadStrategyTest.class.getResource("/timestamp_as_int64.parquet");
Assertions.assertNotNull(resource);
String path = Paths.get(resource.toURI()).toString();
ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
parquetReadStrategy.init(localConf);
SeaTunnelRowType seaTunnelRowTypeInfo =
parquetReadStrategy.getSeaTunnelRowTypeInfo(localConf, path);
Assertions.assertNotNull(seaTunnelRowTypeInfo);
System.out.println(seaTunnelRowTypeInfo);
int index = seaTunnelRowTypeInfo.indexOf("c_timestamp");
TimeZone tz1 = TimeZone.getTimeZone("Asia/Shanghai");
TimeZone.setDefault(tz1);
TestCollector testCollector = new TestCollector();
parquetReadStrategy.read(path, testCollector);
LocalDateTime time1 = (LocalDateTime) testCollector.getRows().get(0).getField(index);

TimeZone tz2 = TimeZone.getTimeZone("UTC");
TimeZone.setDefault(tz2);
TestCollector testCollector2 = new TestCollector();
parquetReadStrategy.read(path, testCollector2);
LocalDateTime time2 = (LocalDateTime) testCollector2.getRows().get(0).getField(index);

Assertions.assertTrue(time1.isAfter(time2));
Assertions.assertEquals(
time1.atZone(tz1.toZoneId()).withZoneSameInstant(tz2.toZoneId()).toLocalDateTime(),
time2);
}

@Test
public void testParquetReadProjection1() throws Exception {
URL resource = ParquetReadStrategyTest.class.getResource("/timestamp_as_int96.parquet");
Expand Down

0 comments on commit b3e1351

Please sign in to comment.