Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: Added reference parquet files and tests for ObjectCodec implementations #6207

Merged
merged 2 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public class TestCodecColumns {
VARIABLE_WIDTH_BIG_INTEGER_COLUMN_DEFINITION,
VARIABLE_WIDTH_BIG_INTEGER_COLUMN_DEFINITION_S);

private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {};

@Rule
public final EngineCleanup base = new EngineCleanup();

Expand All @@ -85,12 +87,13 @@ public class TestCodecColumns {
@Before
public void setUp() {
table = TableTools.newTable(TABLE_DEFINITION,
TableTools.col("VWBA", new byte[] {0, 1, 2}, null, new byte[] {3, 4, 5, 6}),
TableTools.col("VWCD", null, new ArrayTuple(0, 2, 4, 6), new ArrayTuple(1, 3, 5, 7)),
TableTools.col("VWBA", new byte[] {0, 1, 2}, null, new byte[] {3, 4, 5, 6}, EMPTY_BYTE_ARRAY),
TableTools.col("VWCD", null, new ArrayTuple(0, 2, 4, 6), new ArrayTuple(1, 3, 5, 7), null),
TableTools.col("FWBA", new byte[] {7, 8, 9, 10, 11, 12, 13, 14, 15},
new byte[] {16, 17, 18, 19, 20, 21, 22, 23, 24}, new byte[] {0, 0, 0, 0, 0, 0, 0, 0, 0}),
TableTools.col("VWBI", BigInteger.valueOf(91), BigInteger.valueOf(111111111111111L), null),
TableTools.col("VWBIS", BigInteger.valueOf(94), null, BigInteger.valueOf(111111111111112L)));
new byte[] {16, 17, 18, 19, 20, 21, 22, 23, 24}, new byte[] {0, 0, 0, 0, 0, 0, 0, 0, 0},
EMPTY_BYTE_ARRAY),
TableTools.col("VWBI", BigInteger.valueOf(91), BigInteger.valueOf(111111111111111L), null, null),
TableTools.col("VWBIS", BigInteger.valueOf(94), null, BigInteger.valueOf(111111111111112L), null));
}

@Test
Expand All @@ -99,20 +102,32 @@ public void doColumnsTest() throws IOException {
final File dest = new File(dir, "Test.parquet");
try {
ParquetTools.writeTable(table, dest.getPath(), writeInstructions);
final MutableObject<ParquetInstructions> instructionsOut = new MutableObject<>();
final Table result =
ParquetTools.readParquetSchemaAndTable(dest, ParquetInstructions.EMPTY, instructionsOut);
TableTools.show(result);
TestCase.assertEquals(TABLE_DEFINITION, result.getDefinition());
final ParquetInstructions readInstructions = instructionsOut.getValue();
TestCase.assertTrue(
ParquetInstructions.sameColumnNamesAndCodecMappings(expectedReadInstructions, readInstructions));
TstUtils.assertTableEquals(table, result);
doColumnsTestHelper(dest);
} finally {
FileUtils.deleteRecursively(dir);
}
}

@Test
public void doLegacyColumnsTest() {
// Make sure that we can read legacy data encoded with the old codec implementations.
final String path =
TestCodecColumns.class.getResource("/ReferenceParquetWithCodecColumns.parquet").getFile();
doColumnsTestHelper(new File(path));
}

private void doColumnsTestHelper(final File dest) {
final MutableObject<ParquetInstructions> instructionsOut = new MutableObject<>();
final Table result =
ParquetTools.readParquetSchemaAndTable(dest, ParquetInstructions.EMPTY, instructionsOut);
TableTools.show(result);
TestCase.assertEquals(TABLE_DEFINITION, result.getDefinition());
final ParquetInstructions readInstructions = instructionsOut.getValue();
TestCase.assertTrue(
ParquetInstructions.sameColumnNamesAndCodecMappings(expectedReadInstructions, readInstructions));
TstUtils.assertTableEquals(table, result);
}

@Test
public void doCacheTest() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import io.deephaven.engine.util.TableTools;
import io.deephaven.parquet.table.ParquetInstructions;
import io.deephaven.util.codec.*;
import junit.framework.TestCase;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -104,18 +104,30 @@ public void setUp() {
@Test
public void doColumnsTest() throws IOException {
final File dir = Files.createTempDirectory(Paths.get(""), "CODEC_TEST").toFile();
final File dest = new File(dir, "Table.parquet");
final String dest = new File(dir, "Table.parquet").getPath();
try {
ParquetTools.writeTable(table, dest.getPath(), writeInstructions);
final Table result = ParquetTools.readTable(dest.getPath());
TableTools.show(result);
TestCase.assertEquals(TABLE_DEFINITION, result.getDefinition());
TstUtils.assertTableEquals(table, result);
ParquetTools.writeTable(table, dest, writeInstructions);
doColumnsTestHelper(dest);
} finally {
FileUtils.deleteRecursively(dir);
}
}

@Test
public void doLegacyColumnsTest() {
// Make sure that we can read legacy data encoded with the old codec implementations.
final String dest =
TestMapCodecColumns.class.getResource("/ReferenceParquetWithMapCodecData.parquet").getFile();
doColumnsTestHelper(dest);
}

private void doColumnsTestHelper(final String dest) {
final Table result = ParquetTools.readTable(dest);
TableTools.show(result);
Assert.assertEquals(TABLE_DEFINITION, result.getDefinition());
TstUtils.assertTableEquals(table, result);
}

@SuppressWarnings({"unchecked"})
public static <K, V> Map<K, V> mapFromArray(Object... data) {
Map<K, V> map = new LinkedHashMap<K, V>();
Expand Down
Git LFS file not shown
Git LFS file not shown
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.deephaven.engine.util.TableTools;
import io.deephaven.engine.util.file.TrackedFileHandleFactory;
import io.deephaven.parquet.base.BigDecimalParquetBytesCodec;
import io.deephaven.parquet.base.BigIntegerParquetBytesCodec;
import io.deephaven.parquet.base.InvalidParquetFileException;
import io.deephaven.parquet.base.NullStatistics;
import io.deephaven.parquet.table.location.ParquetTableLocation;
Expand Down Expand Up @@ -121,6 +122,7 @@
import static io.deephaven.engine.util.TableTools.newTable;
import static io.deephaven.engine.util.TableTools.shortCol;
import static io.deephaven.engine.util.TableTools.stringCol;
import static io.deephaven.parquet.table.ParquetTableWriter.INDEX_ROW_SET_COLUMN_NAME;
import static io.deephaven.parquet.table.ParquetTools.readTable;
import static io.deephaven.parquet.table.ParquetTools.writeKeyValuePartitionedTable;
import static io.deephaven.parquet.table.ParquetTools.writeTable;
Expand Down Expand Up @@ -533,7 +535,8 @@ public void testLz4RawCompressed() {

// The following file is tagged as LZ4 compressed based on its metadata, but is actually compressed with
// LZ4_RAW. We should be able to read it anyway with no exceptions.
String path = TestParquetTools.class.getResource("/sample_lz4_compressed.parquet").getFile();
final String path =
ParquetTableReadWriteTest.class.getResource("/sample_lz4_compressed.parquet").getFile();
readTable(path, EMPTY.withLayout(ParquetInstructions.ParquetFileLayout.SINGLE_FILE)).select();
final File randomDest = new File(rootFile, "random.parquet");
writeTable(fromDisk, randomDest.getPath(), ParquetTools.LZ4_RAW);
Expand Down Expand Up @@ -1740,7 +1743,8 @@ public void testReadingParquetDataWithEmptyRowGroups() {
{
// Single parquet file with empty row group
final String path =
TestParquetTools.class.getResource("/ReferenceParquetWithEmptyRowGroup1.parquet").getFile();
ParquetTableReadWriteTest.class.getResource("/ReferenceParquetWithEmptyRowGroup1.parquet")
.getFile();
final Table fromDisk =
readTable(path, EMPTY.withLayout(ParquetInstructions.ParquetFileLayout.SINGLE_FILE)).select();
assertEquals(0, fromDisk.size());
Expand All @@ -1752,7 +1756,8 @@ public void testReadingParquetDataWithEmptyRowGroups() {
// is empty. To generate this file, the following branch was used:
// https://github.com/malhotrashivam/deephaven-core/tree/sm-ref-branch
final String path =
TestParquetTools.class.getResource("/ReferenceParquetWithEmptyRowGroup2.parquet").getFile();
ParquetTableReadWriteTest.class.getResource("/ReferenceParquetWithEmptyRowGroup2.parquet")
.getFile();
final Table fromDisk =
readTable(path, EMPTY.withLayout(ParquetInstructions.ParquetFileLayout.SINGLE_FILE)).select();
assertEquals(20, fromDisk.size());
Expand All @@ -1764,7 +1769,7 @@ public void testReadingParquetDataWithEmptyRowGroups() {
{
// Parquet dataset with three files, first and third file have three row groups, two non-empty followed by
// an empty row group, and second file has just one empty row group.
final String dirPath = TestParquetTools.class.getResource("/datasetWithEmptyRowgroups").getFile();
final String dirPath = ParquetTableReadWriteTest.class.getResource("/datasetWithEmptyRowgroups").getFile();
assertFalse(readTable(dirPath + "/file1.parquet").isEmpty());
assertTrue(readTable(dirPath + "/file2.parquet").isEmpty());
assertFalse(readTable(dirPath + "/file3.parquet").isEmpty());
Expand All @@ -1776,6 +1781,81 @@ public void testReadingParquetDataWithEmptyRowGroups() {
}
}

@Test
public void testReadingReferenceParquetDataWithCodec() {
{
final BigDecimalParquetBytesCodec bdCodec = new BigDecimalParquetBytesCodec(20, 1);
final BigIntegerParquetBytesCodec biCodec = new BigIntegerParquetBytesCodec();
ExecutionContext.getContext().getQueryScope().putParam("__bdCodec", bdCodec);
ExecutionContext.getContext().getQueryScope().putParam("__biCodec", biCodec);
final Table source = TableTools.emptyTable(10_000).update(
"LocalDateColumn = ii % 10 == 0 ? null : java.time.LocalDate.ofEpochDay(ii)",
"CompactLocalDateColumn = ii % 10 == 0 ? null : java.time.LocalDate.ofEpochDay(ii)",
"LocalTimeColumn = ii % 10 == 0 ? null : java.time.LocalTime.ofSecondOfDay(ii % 86400)",
"ZonedDateTimeColumn = ii % 10 == 0 ? null : java.time.ZonedDateTime.ofInstant(java.time.Instant.ofEpochSecond(ii), java.time.ZoneId.of(\"UTC\"))",
"StringColumn = ii % 10 == 0 ? null : java.lang.String.valueOf(ii)",
"BigDecimalColumn = ii % 10 == 0 ? null : ii % 2 == 0 ? java.math.BigDecimal.valueOf(ii).stripTrailingZeros() : java.math.BigDecimal.valueOf(-1 * ii).stripTrailingZeros()",
"BigDecimalColumnEncoded = ii % 10 == 0 ? null : __bdCodec.encode(BigDecimalColumn)",
"BigDecimalColumnDecoded = ii % 10 == 0 ? null : __bdCodec.decode(BigDecimalColumnEncoded, 0, BigDecimalColumnEncoded.length)",
"BigIntegerColumn = ii % 10 == 0 ? null : ii % 2 == 0 ? java.math.BigInteger.valueOf(ii*512) : java.math.BigInteger.valueOf(-1*ii*512)",
"BigIntegerColumnEncoded = ii % 10 == 0 ? null : __biCodec.encode(BigIntegerColumn)",
"BigIntegerColumnDecoded = ii % 10 == 0 ? null : __biCodec.decode(BigIntegerColumnEncoded, 0, BigIntegerColumnEncoded.length)");

// Set codecs for each column
final ParquetInstructions instructions = ParquetInstructions.builder()
.addColumnCodec("LocalDateColumn", "io.deephaven.util.codec.LocalDateCodec")
.addColumnCodec("CompactLocalDateColumn", "io.deephaven.util.codec.LocalDateCodec", "Compact")
.addColumnCodec("LocalTimeColumn", "io.deephaven.util.codec.LocalTimeCodec")
.addColumnCodec("ZonedDateTimeColumn", "io.deephaven.util.codec.ZonedDateTimeCodec")
.addColumnCodec("StringColumn", "io.deephaven.util.codec.UTF8StringAsByteArrayCodec")
.addColumnCodec("BigDecimalColumn", "io.deephaven.util.codec.BigDecimalCodec", "20,1,allowrounding")
.addColumnCodec("BigIntegerColumn", "io.deephaven.util.codec.BigIntegerCodec")
.build();

{
// Verify that we can write and read the table with codecs
final File dest = new File(rootFile, "ReferenceParquetWithCodecData.parquet");
ParquetTools.writeTable(source, dest.getPath(), instructions);
checkSingleTable(source, dest);
dest.delete();
}
{
// Verify that we can read the reference parquet file with these codecs
final String path =
ParquetTableReadWriteTest.class.getResource("/ReferenceParquetWithCodecData.parquet").getFile();
final Table fromDisk = readParquetFileFromGitLFS(new File(path));
assertTableEquals(source, fromDisk);
}
}

{
// Repeat similar tests for RowSetCodec
final Table source = TableTools.emptyTable(10_000).updateView(
"A = (int)(ii%3)",
"B = (double)(ii%2)",
"C = ii");
final DataIndex dataIndex = DataIndexer.getOrCreateDataIndex(source, "A", "B");
final File destFile = new File(rootFile, "ReferenceParquetWithRowsetCodecData.parquet");
final Table indexTable = dataIndex.table();
final ParquetInstructions instructions = ParquetInstructions.builder()
.addColumnCodec(INDEX_ROW_SET_COLUMN_NAME, "io.deephaven.engine.table.impl.dataindex.RowSetCodec")
.build();
{
writeTable(indexTable, destFile.getPath(), instructions);
final Table fromDisk = readTable(destFile.getPath());
assertTableEquals(indexTable, fromDisk);
destFile.delete();
}
{
final String path =
ParquetTableReadWriteTest.class.getResource("/ReferenceParquetWithRowsetCodecData.parquet")
.getFile();
final Table fromDiskWithCodec = readParquetFileFromGitLFS(new File(path));
assertTableEquals(indexTable, fromDiskWithCodec);
}
}
}

@Test
public void decimalLogicalTypeTest() {
final Table expected = TableTools.emptyTable(100_000).update(
Expand Down
Git LFS file not shown
Git LFS file not shown