Skip to content

Commit

Permalink
test: Added reference parquet files and tests for ObjectCodec impleme…
Browse files Browse the repository at this point in the history
…ntations (#6207)

Closes #5767
  • Loading branch information
malhotrashivam authored Oct 16, 2024
1 parent 98851b4 commit 9d5006f
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public class TestCodecColumns {
VARIABLE_WIDTH_BIG_INTEGER_COLUMN_DEFINITION,
VARIABLE_WIDTH_BIG_INTEGER_COLUMN_DEFINITION_S);

private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {};

@Rule
public final EngineCleanup base = new EngineCleanup();

Expand All @@ -85,12 +87,13 @@ public class TestCodecColumns {
@Before
public void setUp() {
table = TableTools.newTable(TABLE_DEFINITION,
TableTools.col("VWBA", new byte[] {0, 1, 2}, null, new byte[] {3, 4, 5, 6}),
TableTools.col("VWCD", null, new ArrayTuple(0, 2, 4, 6), new ArrayTuple(1, 3, 5, 7)),
TableTools.col("VWBA", new byte[] {0, 1, 2}, null, new byte[] {3, 4, 5, 6}, EMPTY_BYTE_ARRAY),
TableTools.col("VWCD", null, new ArrayTuple(0, 2, 4, 6), new ArrayTuple(1, 3, 5, 7), null),
TableTools.col("FWBA", new byte[] {7, 8, 9, 10, 11, 12, 13, 14, 15},
new byte[] {16, 17, 18, 19, 20, 21, 22, 23, 24}, new byte[] {0, 0, 0, 0, 0, 0, 0, 0, 0}),
TableTools.col("VWBI", BigInteger.valueOf(91), BigInteger.valueOf(111111111111111L), null),
TableTools.col("VWBIS", BigInteger.valueOf(94), null, BigInteger.valueOf(111111111111112L)));
new byte[] {16, 17, 18, 19, 20, 21, 22, 23, 24}, new byte[] {0, 0, 0, 0, 0, 0, 0, 0, 0},
EMPTY_BYTE_ARRAY),
TableTools.col("VWBI", BigInteger.valueOf(91), BigInteger.valueOf(111111111111111L), null, null),
TableTools.col("VWBIS", BigInteger.valueOf(94), null, BigInteger.valueOf(111111111111112L), null));
}

@Test
Expand All @@ -99,20 +102,32 @@ public void doColumnsTest() throws IOException {
final File dest = new File(dir, "Test.parquet");
try {
ParquetTools.writeTable(table, dest.getPath(), writeInstructions);
final MutableObject<ParquetInstructions> instructionsOut = new MutableObject<>();
final Table result =
ParquetTools.readParquetSchemaAndTable(dest, ParquetInstructions.EMPTY, instructionsOut);
TableTools.show(result);
TestCase.assertEquals(TABLE_DEFINITION, result.getDefinition());
final ParquetInstructions readInstructions = instructionsOut.getValue();
TestCase.assertTrue(
ParquetInstructions.sameColumnNamesAndCodecMappings(expectedReadInstructions, readInstructions));
TstUtils.assertTableEquals(table, result);
doColumnsTestHelper(dest);
} finally {
FileUtils.deleteRecursively(dir);
}
}

@Test
public void doLegacyColumnsTest() {
// Make sure that we can read legacy data encoded with the old codec implementations.
final String path =
TestCodecColumns.class.getResource("/ReferenceParquetWithCodecColumns.parquet").getFile();
doColumnsTestHelper(new File(path));
}

private void doColumnsTestHelper(final File dest) {
final MutableObject<ParquetInstructions> instructionsOut = new MutableObject<>();
final Table result =
ParquetTools.readParquetSchemaAndTable(dest, ParquetInstructions.EMPTY, instructionsOut);
TableTools.show(result);
TestCase.assertEquals(TABLE_DEFINITION, result.getDefinition());
final ParquetInstructions readInstructions = instructionsOut.getValue();
TestCase.assertTrue(
ParquetInstructions.sameColumnNamesAndCodecMappings(expectedReadInstructions, readInstructions));
TstUtils.assertTableEquals(table, result);
}

@Test
public void doCacheTest() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import io.deephaven.engine.util.TableTools;
import io.deephaven.parquet.table.ParquetInstructions;
import io.deephaven.util.codec.*;
import junit.framework.TestCase;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -104,18 +104,30 @@ public void setUp() {
@Test
public void doColumnsTest() throws IOException {
final File dir = Files.createTempDirectory(Paths.get(""), "CODEC_TEST").toFile();
final File dest = new File(dir, "Table.parquet");
final String dest = new File(dir, "Table.parquet").getPath();
try {
ParquetTools.writeTable(table, dest.getPath(), writeInstructions);
final Table result = ParquetTools.readTable(dest.getPath());
TableTools.show(result);
TestCase.assertEquals(TABLE_DEFINITION, result.getDefinition());
TstUtils.assertTableEquals(table, result);
ParquetTools.writeTable(table, dest, writeInstructions);
doColumnsTestHelper(dest);
} finally {
FileUtils.deleteRecursively(dir);
}
}

@Test
public void doLegacyColumnsTest() {
// Make sure that we can read legacy data encoded with the old codec implementations.
final String dest =
TestMapCodecColumns.class.getResource("/ReferenceParquetWithMapCodecData.parquet").getFile();
doColumnsTestHelper(dest);
}

private void doColumnsTestHelper(final String dest) {
final Table result = ParquetTools.readTable(dest);
TableTools.show(result);
Assert.assertEquals(TABLE_DEFINITION, result.getDefinition());
TstUtils.assertTableEquals(table, result);
}

@SuppressWarnings({"unchecked"})
public static <K, V> Map<K, V> mapFromArray(Object... data) {
Map<K, V> map = new LinkedHashMap<K, V>();
Expand Down
Git LFS file not shown
Git LFS file not shown
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.deephaven.engine.util.TableTools;
import io.deephaven.engine.util.file.TrackedFileHandleFactory;
import io.deephaven.parquet.base.BigDecimalParquetBytesCodec;
import io.deephaven.parquet.base.BigIntegerParquetBytesCodec;
import io.deephaven.parquet.base.InvalidParquetFileException;
import io.deephaven.parquet.base.NullStatistics;
import io.deephaven.parquet.table.location.ParquetTableLocation;
Expand Down Expand Up @@ -121,6 +122,7 @@
import static io.deephaven.engine.util.TableTools.newTable;
import static io.deephaven.engine.util.TableTools.shortCol;
import static io.deephaven.engine.util.TableTools.stringCol;
import static io.deephaven.parquet.table.ParquetTableWriter.INDEX_ROW_SET_COLUMN_NAME;
import static io.deephaven.parquet.table.ParquetTools.readTable;
import static io.deephaven.parquet.table.ParquetTools.writeKeyValuePartitionedTable;
import static io.deephaven.parquet.table.ParquetTools.writeTable;
Expand Down Expand Up @@ -533,7 +535,8 @@ public void testLz4RawCompressed() {

// The following file is tagged as LZ4 compressed based on its metadata, but is actually compressed with
// LZ4_RAW. We should be able to read it anyway with no exceptions.
String path = TestParquetTools.class.getResource("/sample_lz4_compressed.parquet").getFile();
final String path =
ParquetTableReadWriteTest.class.getResource("/sample_lz4_compressed.parquet").getFile();
readTable(path, EMPTY.withLayout(ParquetInstructions.ParquetFileLayout.SINGLE_FILE)).select();
final File randomDest = new File(rootFile, "random.parquet");
writeTable(fromDisk, randomDest.getPath(), ParquetTools.LZ4_RAW);
Expand Down Expand Up @@ -1740,7 +1743,8 @@ public void testReadingParquetDataWithEmptyRowGroups() {
{
// Single parquet file with empty row group
final String path =
TestParquetTools.class.getResource("/ReferenceParquetWithEmptyRowGroup1.parquet").getFile();
ParquetTableReadWriteTest.class.getResource("/ReferenceParquetWithEmptyRowGroup1.parquet")
.getFile();
final Table fromDisk =
readTable(path, EMPTY.withLayout(ParquetInstructions.ParquetFileLayout.SINGLE_FILE)).select();
assertEquals(0, fromDisk.size());
Expand All @@ -1752,7 +1756,8 @@ public void testReadingParquetDataWithEmptyRowGroups() {
// is empty. To generate this file, the following branch was used:
// https://github.com/malhotrashivam/deephaven-core/tree/sm-ref-branch
final String path =
TestParquetTools.class.getResource("/ReferenceParquetWithEmptyRowGroup2.parquet").getFile();
ParquetTableReadWriteTest.class.getResource("/ReferenceParquetWithEmptyRowGroup2.parquet")
.getFile();
final Table fromDisk =
readTable(path, EMPTY.withLayout(ParquetInstructions.ParquetFileLayout.SINGLE_FILE)).select();
assertEquals(20, fromDisk.size());
Expand All @@ -1764,7 +1769,7 @@ public void testReadingParquetDataWithEmptyRowGroups() {
{
// Parquet dataset with three files, first and third file have three row groups, two non-empty followed by
// an empty row group, and second file has just one empty row group.
final String dirPath = TestParquetTools.class.getResource("/datasetWithEmptyRowgroups").getFile();
final String dirPath = ParquetTableReadWriteTest.class.getResource("/datasetWithEmptyRowgroups").getFile();
assertFalse(readTable(dirPath + "/file1.parquet").isEmpty());
assertTrue(readTable(dirPath + "/file2.parquet").isEmpty());
assertFalse(readTable(dirPath + "/file3.parquet").isEmpty());
Expand All @@ -1776,6 +1781,81 @@ public void testReadingParquetDataWithEmptyRowGroups() {
}
}

@Test
public void testReadingReferenceParquetDataWithCodec() {
{
final BigDecimalParquetBytesCodec bdCodec = new BigDecimalParquetBytesCodec(20, 1);
final BigIntegerParquetBytesCodec biCodec = new BigIntegerParquetBytesCodec();
ExecutionContext.getContext().getQueryScope().putParam("__bdCodec", bdCodec);
ExecutionContext.getContext().getQueryScope().putParam("__biCodec", biCodec);
final Table source = TableTools.emptyTable(10_000).update(
"LocalDateColumn = ii % 10 == 0 ? null : java.time.LocalDate.ofEpochDay(ii)",
"CompactLocalDateColumn = ii % 10 == 0 ? null : java.time.LocalDate.ofEpochDay(ii)",
"LocalTimeColumn = ii % 10 == 0 ? null : java.time.LocalTime.ofSecondOfDay(ii % 86400)",
"ZonedDateTimeColumn = ii % 10 == 0 ? null : java.time.ZonedDateTime.ofInstant(java.time.Instant.ofEpochSecond(ii), java.time.ZoneId.of(\"UTC\"))",
"StringColumn = ii % 10 == 0 ? null : java.lang.String.valueOf(ii)",
"BigDecimalColumn = ii % 10 == 0 ? null : ii % 2 == 0 ? java.math.BigDecimal.valueOf(ii).stripTrailingZeros() : java.math.BigDecimal.valueOf(-1 * ii).stripTrailingZeros()",
"BigDecimalColumnEncoded = ii % 10 == 0 ? null : __bdCodec.encode(BigDecimalColumn)",
"BigDecimalColumnDecoded = ii % 10 == 0 ? null : __bdCodec.decode(BigDecimalColumnEncoded, 0, BigDecimalColumnEncoded.length)",
"BigIntegerColumn = ii % 10 == 0 ? null : ii % 2 == 0 ? java.math.BigInteger.valueOf(ii*512) : java.math.BigInteger.valueOf(-1*ii*512)",
"BigIntegerColumnEncoded = ii % 10 == 0 ? null : __biCodec.encode(BigIntegerColumn)",
"BigIntegerColumnDecoded = ii % 10 == 0 ? null : __biCodec.decode(BigIntegerColumnEncoded, 0, BigIntegerColumnEncoded.length)");

// Set codecs for each column
final ParquetInstructions instructions = ParquetInstructions.builder()
.addColumnCodec("LocalDateColumn", "io.deephaven.util.codec.LocalDateCodec")
.addColumnCodec("CompactLocalDateColumn", "io.deephaven.util.codec.LocalDateCodec", "Compact")
.addColumnCodec("LocalTimeColumn", "io.deephaven.util.codec.LocalTimeCodec")
.addColumnCodec("ZonedDateTimeColumn", "io.deephaven.util.codec.ZonedDateTimeCodec")
.addColumnCodec("StringColumn", "io.deephaven.util.codec.UTF8StringAsByteArrayCodec")
.addColumnCodec("BigDecimalColumn", "io.deephaven.util.codec.BigDecimalCodec", "20,1,allowrounding")
.addColumnCodec("BigIntegerColumn", "io.deephaven.util.codec.BigIntegerCodec")
.build();

{
// Verify that we can write and read the table with codecs
final File dest = new File(rootFile, "ReferenceParquetWithCodecData.parquet");
ParquetTools.writeTable(source, dest.getPath(), instructions);
checkSingleTable(source, dest);
dest.delete();
}
{
// Verify that we can read the reference parquet file with these codecs
final String path =
ParquetTableReadWriteTest.class.getResource("/ReferenceParquetWithCodecData.parquet").getFile();
final Table fromDisk = readParquetFileFromGitLFS(new File(path));
assertTableEquals(source, fromDisk);
}
}

{
// Repeat similar tests for RowSetCodec
final Table source = TableTools.emptyTable(10_000).updateView(
"A = (int)(ii%3)",
"B = (double)(ii%2)",
"C = ii");
final DataIndex dataIndex = DataIndexer.getOrCreateDataIndex(source, "A", "B");
final File destFile = new File(rootFile, "ReferenceParquetWithRowsetCodecData.parquet");
final Table indexTable = dataIndex.table();
final ParquetInstructions instructions = ParquetInstructions.builder()
.addColumnCodec(INDEX_ROW_SET_COLUMN_NAME, "io.deephaven.engine.table.impl.dataindex.RowSetCodec")
.build();
{
writeTable(indexTable, destFile.getPath(), instructions);
final Table fromDisk = readTable(destFile.getPath());
assertTableEquals(indexTable, fromDisk);
destFile.delete();
}
{
final String path =
ParquetTableReadWriteTest.class.getResource("/ReferenceParquetWithRowsetCodecData.parquet")
.getFile();
final Table fromDiskWithCodec = readParquetFileFromGitLFS(new File(path));
assertTableEquals(indexTable, fromDiskWithCodec);
}
}
}

@Test
public void decimalLogicalTypeTest() {
final Table expected = TableTools.emptyTable(100_000).update(
Expand Down
Git LFS file not shown
Git LFS file not shown

0 comments on commit 9d5006f

Please sign in to comment.