diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java index 774d816d897..ba3b03fc04a 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java @@ -10,6 +10,7 @@ import io.deephaven.base.Pair; import io.deephaven.base.verify.Require; import io.deephaven.engine.context.ExecutionContext; +import io.deephaven.engine.liveness.LivenessScopeStack; import io.deephaven.engine.primitive.iterator.CloseableIterator; import io.deephaven.engine.table.ColumnDefinition; import io.deephaven.engine.table.ColumnSource; @@ -22,6 +23,7 @@ import io.deephaven.engine.updategraph.UpdateSourceRegistrar; import io.deephaven.parquet.base.ParquetMetadataFileWriter; import io.deephaven.parquet.base.NullParquetMetadataFileWriter; +import io.deephaven.util.SafeCloseable; import io.deephaven.util.channel.SeekableChannelsProvider; import io.deephaven.util.channel.SeekableChannelsProviderLoader; import io.deephaven.util.channel.SeekableChannelsProviderPlugin; @@ -88,6 +90,7 @@ public class ParquetTools { private static final int MAX_PARTITIONING_LEVELS_INFERENCE = 32; + private static final String[][] EMPTY_INDEXES = new String[0][]; private ParquetTools() {} @@ -506,7 +509,9 @@ private static List indexInfoBuilderHelper( * written as "key=value" format in a nested directory structure. To generate these individual partitions, this * method will call {@link Table#partitionBy(String...) partitionBy} on all the partitioning columns of provided * table. The generated parquet files will have names of the format provided by - * {@link ParquetInstructions#baseNameForPartitionedParquetData()}. + * {@link ParquetInstructions#baseNameForPartitionedParquetData()}. Any indexing columns present on the source table + * will be written as sidecar tables. To write only a subset of the indexes or add additional indexes while writing, + * use {@link #writeKeyValuePartitionedTable(Table, String, ParquetInstructions, String[][])}. * * @param sourceTable The table to partition and write * @param destinationDir The path to destination root directory to store partitioned data in nested format. @@ -517,7 +522,31 @@ public static void writeKeyValuePartitionedTable(@NotNull final Table sourceTabl @NotNull final String destinationDir, @NotNull final ParquetInstructions writeInstructions) { writeKeyValuePartitionedTable(sourceTable, sourceTable.getDefinition(), destinationDir, - writeInstructions); + writeInstructions, indexedColumnNames(sourceTable)); + } + + /** + * Write table to disk in parquet format with {@link TableDefinition#getPartitioningColumns() partitioning columns} + * written as "key=value" format in a nested directory structure. To generate these individual partitions, this + * method will call {@link Table#partitionBy(String...) partitionBy} on all the partitioning columns of provided + * table. The generated parquet files will have names of the format provided by + * {@link ParquetInstructions#baseNameForPartitionedParquetData()}. + * + * @param sourceTable The table to partition and write + * @param destinationDir The path to destination root directory to store partitioned data in nested format. + * Non-existing directories are created. + * @param writeInstructions Write instructions for customizations while writing + * @param indexColumnArr Arrays containing the column names for indexes to persist. The write operation will store + * the index info as sidecar tables. This argument is used to narrow the set of indexes to write, or to be + * explicit about the expected set of indexes present on all sources. Indexes that are specified but missing + * will be computed on demand. + */ + public static void writeKeyValuePartitionedTable(@NotNull final Table sourceTable, + @NotNull final String destinationDir, + @NotNull final ParquetInstructions writeInstructions, + @Nullable final String[][] indexColumnArr) { + writeKeyValuePartitionedTable(sourceTable, sourceTable.getDefinition(), destinationDir, + writeInstructions, indexColumnArr); } /** @@ -525,7 +554,9 @@ public static void writeKeyValuePartitionedTable(@NotNull final Table sourceTabl * written as "key=value" format in a nested directory structure. To generate these individual partitions, this * method will call {@link Table#partitionBy(String...) partitionBy} on all the partitioning columns in the provided * table definition. The generated parquet files will have names of the format provided by - * {@link ParquetInstructions#baseNameForPartitionedParquetData()}. + * {@link ParquetInstructions#baseNameForPartitionedParquetData()}. Any indexing columns present on the source table + * will be written as sidecar tables. To write only a subset of the indexes or add additional indexes while writing, + * use {@link #writeKeyValuePartitionedTable(Table, TableDefinition, String, ParquetInstructions, String[][])}. * * @param sourceTable The table to partition and write * @param definition table definition to use (instead of the one implied by the table itself) @@ -537,6 +568,33 @@ public static void writeKeyValuePartitionedTable(@NotNull final Table sourceTabl @NotNull final TableDefinition definition, @NotNull final String destinationDir, @NotNull final ParquetInstructions writeInstructions) { + writeKeyValuePartitionedTable(sourceTable, definition, destinationDir, writeInstructions, + indexedColumnNames(sourceTable)); + + } + + /** + * Write table to disk in parquet format with {@link TableDefinition#getPartitioningColumns() partitioning columns} + * written as "key=value" format in a nested directory structure. To generate these individual partitions, this + * method will call {@link Table#partitionBy(String...) partitionBy} on all the partitioning columns in the provided + * table definition. The generated parquet files will have names of the format provided by + * {@link ParquetInstructions#baseNameForPartitionedParquetData()}. + * + * @param sourceTable The table to partition and write + * @param definition table definition to use (instead of the one implied by the table itself) + * @param destinationDir The path to destination root directory to store partitioned data in nested format. + * Non-existing directories are created. + * @param writeInstructions Write instructions for customizations while writing + * @param indexColumnArr Arrays containing the column names for indexes to persist. The write operation will store + * the index info as sidecar tables. This argument is used to narrow the set of indexes to write, or to be + * explicit about the expected set of indexes present on all sources. Indexes that are specified but missing + * will be computed on demand. + */ + public static void writeKeyValuePartitionedTable(@NotNull final Table sourceTable, + @NotNull final TableDefinition definition, + @NotNull final String destinationDir, + @NotNull final ParquetInstructions writeInstructions, + @Nullable final String[][] indexColumnArr) { final List> partitioningColumns = definition.getPartitioningColumns(); if (partitioningColumns.isEmpty()) { throw new IllegalArgumentException("Table must have partitioning columns to write partitioned data"); @@ -549,7 +607,7 @@ public static void writeKeyValuePartitionedTable(@NotNull final Table sourceTabl final TableDefinition leafDefinition = getNonKeyTableDefinition(new HashSet<>(Arrays.asList(partitioningColNames)), definition); writeKeyValuePartitionedTableImpl(partitionedTable, keyTableDefinition, leafDefinition, destinationDir, - writeInstructions, Optional.of(sourceTable)); + writeInstructions, indexColumnArr, Optional.of(sourceTable)); } /** @@ -566,12 +624,34 @@ public static void writeKeyValuePartitionedTable(@NotNull final Table sourceTabl public static void writeKeyValuePartitionedTable(@NotNull final PartitionedTable partitionedTable, @NotNull final String destinationDir, @NotNull final ParquetInstructions writeInstructions) { + writeKeyValuePartitionedTable(partitionedTable, destinationDir, writeInstructions, EMPTY_INDEXES); + } + + /** + * Write a partitioned table to disk in parquet format with all the {@link PartitionedTable#keyColumnNames() key + * columns} as "key=value" format in a nested directory structure. To generate the partitioned table, users can call + * {@link Table#partitionBy(String...) partitionBy} on the required columns. The generated parquet files will have + * names of the format provided by {@link ParquetInstructions#baseNameForPartitionedParquetData()}. + * + * @param partitionedTable The partitioned table to write + * @param destinationDir The path to destination root directory to store partitioned data in nested format. + * Non-existing directories are created. + * @param writeInstructions Write instructions for customizations while writing + * @param indexColumnArr Arrays containing the column names for indexes to persist. The write operation will store + * the index info as sidecar tables. This argument is used to narrow the set of indexes to write, or to be + * explicit about the expected set of indexes present on all sources. Indexes that are specified but missing + * will be computed on demand. + */ + public static void writeKeyValuePartitionedTable(@NotNull final PartitionedTable partitionedTable, + @NotNull final String destinationDir, + @NotNull final ParquetInstructions writeInstructions, + @Nullable final String[][] indexColumnArr) { final TableDefinition keyTableDefinition = getKeyTableDefinition(partitionedTable.keyColumnNames(), partitionedTable.table().getDefinition()); final TableDefinition leafDefinition = getNonKeyTableDefinition(partitionedTable.keyColumnNames(), partitionedTable.constituentDefinition()); writeKeyValuePartitionedTableImpl(partitionedTable, keyTableDefinition, leafDefinition, destinationDir, - writeInstructions, Optional.empty()); + writeInstructions, indexColumnArr, Optional.empty()); } /** @@ -590,10 +670,34 @@ public static void writeKeyValuePartitionedTable(@NotNull final PartitionedTable @NotNull final TableDefinition definition, @NotNull final String destinationDir, @NotNull final ParquetInstructions writeInstructions) { + writeKeyValuePartitionedTable(partitionedTable, definition, destinationDir, writeInstructions, EMPTY_INDEXES); + } + + /** + * Write a partitioned table to disk in parquet format with all the {@link PartitionedTable#keyColumnNames() key + * columns} as "key=value" format in a nested directory structure. To generate the partitioned table, users can call + * {@link Table#partitionBy(String...) partitionBy} on the required columns. The generated parquet files will have + * names of the format provided by {@link ParquetInstructions#baseNameForPartitionedParquetData()}. + * + * @param partitionedTable The partitioned table to write + * @param definition table definition to use (instead of the one implied by the table itself) + * @param destinationDir The path to destination root directory to store partitioned data in nested format. + * Non-existing directories are created. + * @param writeInstructions Write instructions for customizations while writing + * @param indexColumnArr Arrays containing the column names for indexes to persist. The write operation will store + * the index info as sidecar tables. This argument is used to narrow the set of indexes to write, or to be + * explicit about the expected set of indexes present on all sources. Indexes that are specified but missing + * will be computed on demand. + */ + public static void writeKeyValuePartitionedTable(@NotNull final PartitionedTable partitionedTable, + @NotNull final TableDefinition definition, + @NotNull final String destinationDir, + @NotNull final ParquetInstructions writeInstructions, + @NotNull final String[][] indexColumnArr) { final TableDefinition keyTableDefinition = getKeyTableDefinition(partitionedTable.keyColumnNames(), definition); final TableDefinition leafDefinition = getNonKeyTableDefinition(partitionedTable.keyColumnNames(), definition); writeKeyValuePartitionedTableImpl(partitionedTable, keyTableDefinition, leafDefinition, destinationDir, - writeInstructions, Optional.empty()); + writeInstructions, indexColumnArr, Optional.empty()); } /** @@ -605,6 +709,10 @@ public static void writeKeyValuePartitionedTable(@NotNull final PartitionedTable * @param leafDefinition The definition for leaf parquet files to be written * @param destinationRoot The path to destination root directory to store partitioned data in nested format * @param writeInstructions Write instructions for customizations while writing + * @param indexColumnArr Arrays containing the column names for indexes to persist. The write operation will store + * the index info as sidecar tables. This argument is used to narrow the set of indexes to write, or to be + * explicit about the expected set of indexes present on all sources. Indexes that are specified but missing + * will be computed on demand. * @param sourceTable The optional source table, provided when user provides a merged source table to write, like in * {@link #writeKeyValuePartitionedTable(Table, String, ParquetInstructions)} and * {@link #writeKeyValuePartitionedTable(Table, TableDefinition, String, ParquetInstructions)} @@ -614,6 +722,7 @@ private static void writeKeyValuePartitionedTableImpl(@NotNull final Partitioned @NotNull final TableDefinition leafDefinition, @NotNull final String destinationRoot, @NotNull final ParquetInstructions writeInstructions, + @Nullable final String[][] indexColumnArr, @NotNull final Optional sourceTable) { if (leafDefinition.numColumns() == 0) { throw new IllegalArgumentException("Cannot write a partitioned parquet table without any non-partitioning " @@ -692,10 +801,24 @@ private static void writeKeyValuePartitionedTableImpl(@NotNull final Partitioned final Map> computedCache = buildComputedCache(() -> sourceTable.orElseGet(partitionedTable::merge), leafDefinition); final Table[] partitionedDataArray = partitionedData.toArray(Table[]::new); - // TODO Verify correctness with Ryan/Larry - writeParquetTablesImpl(partitionedDataArray, leafDefinition, writeInstructions, - destinations.toArray(File[]::new), indexedColumnNames(partitionedDataArray), partitioningColumnsSchema, - new File(destinationRoot), computedCache); + try (final SafeCloseable ignored = LivenessScopeStack.open()) { + // TODO(deephaven-core#5292): Optimize creating index on constituent tables + addIndexesToTables(partitionedDataArray, indexColumnArr); + writeParquetTablesImpl(partitionedDataArray, leafDefinition, writeInstructions, + destinations.toArray(File[]::new), indexColumnArr, partitioningColumnsSchema, + new File(destinationRoot), computedCache); + } + } + + private static void addIndexesToTables(@NotNull final Table[] tables, + @Nullable final String[][] indexColumnArr) { + if (indexColumnArr != null && indexColumnArr.length != 0) { + for (final Table table : tables) { + for (final String[] indexCols : indexColumnArr) { + DataIndexer.getOrCreateDataIndex(table, indexCols); + } + } + } } /** @@ -939,22 +1062,32 @@ private static void writeParquetTablesImpl(@NotNull final Table[] sources, * @implNote This only examines the first source table. The writing code will compute missing indexes for the other * source tables. */ - private static String[][] indexedColumnNames(@NotNull Table @NotNull [] sources) { + @NotNull + private static String[][] indexedColumnNames(@NotNull final Table @NotNull [] sources) { if (sources.length == 0) { - return new String[0][]; + return EMPTY_INDEXES; } - // Use the first table as the source of indexed columns - final Table firstTable = sources[0]; - final DataIndexer dataIndexer = DataIndexer.existingOf(firstTable.getRowSet()); + return indexedColumnNames(sources[0]); + } + + /** + * Examine the source table to retrieve the list of indexes as String[] arrays. + * + * @param source The table from which to retrieve the indexes + * @return An array containing the indexes as String[] arrays. + */ + @NotNull + private static String[][] indexedColumnNames(@NotNull final Table source) { + final DataIndexer dataIndexer = DataIndexer.existingOf(source.getRowSet()); if (dataIndexer == null) { - return new String[0][]; + return EMPTY_INDEXES; } final List dataIndexes = dataIndexer.dataIndexes(true); if (dataIndexes.isEmpty()) { - return new String[0][]; + return EMPTY_INDEXES; } - final Map> nameToColumn = firstTable.getColumnSourceMap(); + final Map> nameToColumn = source.getColumnSourceMap(); // We disregard collisions, here; any mapped name is an adequate choice. final Map, String> columnToName = nameToColumn.entrySet().stream() .collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey)); diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java index 072e901548f..5ad5348025d 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java @@ -675,7 +675,6 @@ public void writeKeyValuePartitionedDataWithIntegerPartitionsTest() { "I = ii")) .withDefinitionUnsafe(definition); DataIndexer.getOrCreateDataIndex(indexedtable, "I"); - // TODO verify correctness of the indexing data final File parentDir = new File(rootFile, "writeKeyValuePartitionedDataTest"); final ParquetInstructions writeInstructions = ParquetInstructions.builder() @@ -688,17 +687,20 @@ public void writeKeyValuePartitionedDataWithIntegerPartitionsTest() { assertTrue(new File(parentDir, "_common_metadata").exists()); assertTrue(new File(parentDir, "_metadata").exists()); - // Verify that the partitioned data exists + // Verify that the partitioning and indexing data exists for (int PC1 = 0; PC1 <= 2; PC1++) { for (int PC2 = 0; PC2 <= 1; PC2++) { final File dir = new File(parentDir, "PC1=" + PC1 + File.separator + "PC2=" + PC2); assertTrue(dir.exists() && dir.isDirectory()); final File dataFile = new File(dir, "data.parquet"); assertTrue(dataFile.exists() && dataFile.isFile()); + final File indexFile = new File(dir, ".dh_metadata/indexes/I/index_I_data.parquet"); + assertTrue(indexFile.exists() && indexFile.isFile()); } } final Table fromDisk = readKeyValuePartitionedTable(parentDir, EMPTY); + verifyIndexingInfoExists(fromDisk, "I"); fromDisk.where("I == 3").select(); assertTableEquals(indexedtable.sort("PC1", "PC2"), fromDisk.sort("PC1", "PC2")); @@ -715,7 +717,7 @@ public void writeKeyValuePartitionedDataWithIntegerPartitionsTest() { assertFalse(new File(parentDirWithoutMetadata, "_common_metadata").exists()); assertFalse(new File(parentDirWithoutMetadata, "_metadata").exists()); - // Verify that the partitioned data exists + // Verify that the partitioning and indexing data exists for (int PC1 = 0; PC1 <= 2; PC1++) { for (int PC2 = 0; PC2 <= 1; PC2++) { final File dir = new File(parentDirWithoutMetadata, "PC1=" + PC1 + File.separator + "PC2=" + PC2); @@ -723,12 +725,14 @@ public void writeKeyValuePartitionedDataWithIntegerPartitionsTest() { final File[] fileList = dir.listFiles(); for (final File dataFile : fileList) { // hidden indexing data - assertTrue(dataFile.isHidden() || dataFile.getName().endsWith(".parquet")); + assertTrue(dataFile.getName().equals(".dh_metadata") + || dataFile.getName().endsWith(".parquet")); } } } final Table fromDiskWithoutMetadata = readKeyValuePartitionedTable(parentDirWithoutMetadata, EMPTY); assertTableEquals(fromDisk, fromDiskWithoutMetadata); + verifyIndexingInfoExists(fromDiskWithoutMetadata, "I"); } @Test @@ -1070,26 +1074,57 @@ public void testAllNonPartitioningColumnTypes() { writeKeyValuePartitionedTable(inputData, parentDir.getPath(), writeInstructions); // Store the big decimal with the precision and scale consistent with what we write to parquet - inputData = maybeFixBigDecimal(inputData); + final Table bigDecimalFixedInputData = maybeFixBigDecimal(inputData); final String[] partitioningColumns = definition.getPartitioningColumns().stream() .map(ColumnDefinition::getName).toArray(String[]::new); - Table fromDisk = readKeyValuePartitionedTable(parentDir, EMPTY).select(); - assertTableEquals(inputData.sort(partitioningColumns), fromDisk.sort(partitioningColumns)); - Table fromDiskWithMetadata = readTable(new File(parentDir, "_common_metadata")); - assertTableEquals(inputData.sort(partitioningColumns), fromDiskWithMetadata.sort(partitioningColumns)); + { + final Table fromDisk = readKeyValuePartitionedTable(parentDir, EMPTY).select(); + assertTableEquals(bigDecimalFixedInputData.sort(partitioningColumns), fromDisk.sort(partitioningColumns)); + final Table fromDiskWithMetadata = readTable(new File(parentDir, "_common_metadata")); + assertTableEquals(bigDecimalFixedInputData.sort(partitioningColumns), + fromDiskWithMetadata.sort(partitioningColumns)); + FileUtils.deleteRecursively(parentDir); + } - FileUtils.deleteRecursively(parentDir); + // Next API we test is to pass additional indexing columns + final String indexColumn = "NPC5"; + final String[][] indexColumns = new String[][] {{indexColumn}}; + { + writeKeyValuePartitionedTable(inputData, parentDir.getPath(), writeInstructions, indexColumns); + assertFalse(DataIndexer.hasDataIndex(inputData, indexColumn)); + final Table fromDisk = readKeyValuePartitionedTable(parentDir, EMPTY); + verifyIndexingInfoExists(fromDisk, indexColumn); + assertTableEquals(bigDecimalFixedInputData.sort(partitioningColumns), fromDisk.sort(partitioningColumns)); + final Table fromDiskWithMetadata = readTable(new File(parentDir, "_common_metadata")); + assertTableEquals(bigDecimalFixedInputData.sort(partitioningColumns), + fromDiskWithMetadata.sort(partitioningColumns)); + FileUtils.deleteRecursively(parentDir); + } // Next API we test is passing the partitioned table without any table definition final PartitionedTable partitionedTable = inputData.partitionBy("PC1"); - writeKeyValuePartitionedTable(partitionedTable, parentDir.getPath(), writeInstructions); - fromDisk = readKeyValuePartitionedTable(parentDir, EMPTY).select(); - assertTableEquals(inputData.sort(partitioningColumns), fromDisk.sort(partitioningColumns)); - fromDiskWithMetadata = readTable(new File(parentDir, "_common_metadata")); - assertTableEquals(inputData.sort(partitioningColumns), fromDiskWithMetadata.sort(partitioningColumns)); + { + writeKeyValuePartitionedTable(partitionedTable, parentDir.getPath(), writeInstructions); + final Table fromDisk = readKeyValuePartitionedTable(parentDir, EMPTY).select(); + assertTableEquals(bigDecimalFixedInputData.sort(partitioningColumns), fromDisk.sort(partitioningColumns)); + final Table fromDiskWithMetadata = readTable(new File(parentDir, "_common_metadata")); + assertTableEquals(bigDecimalFixedInputData.sort(partitioningColumns), + fromDiskWithMetadata.sort(partitioningColumns)); + FileUtils.deleteRecursively(parentDir); + } - FileUtils.deleteRecursively(parentDir); + // Next API we test is to pass additional indexing columns with partitioned table and no definition + { + writeKeyValuePartitionedTable(partitionedTable, parentDir.getPath(), writeInstructions, indexColumns); + final Table fromDisk = readKeyValuePartitionedTable(parentDir, EMPTY); + verifyIndexingInfoExists(fromDisk, "NPC5"); + assertTableEquals(bigDecimalFixedInputData.sort(partitioningColumns), fromDisk.sort(partitioningColumns)); + final Table fromDiskWithMetadata = readTable(new File(parentDir, "_common_metadata")); + assertTableEquals(bigDecimalFixedInputData.sort(partitioningColumns), + fromDiskWithMetadata.sort(partitioningColumns)); + FileUtils.deleteRecursively(parentDir); + } // Next API we test is passing the regular table with an updated table definition where we drop // some partitioning columns and non-partitioning columns and add some new non-partitioning columns @@ -1099,21 +1134,50 @@ public void testAllNonPartitioningColumnTypes() { .collect(Collectors.toList()); newColumns.add(ColumnDefinition.ofInt("NPC15")); final TableDefinition newDefinition = TableDefinition.of(newColumns); - writeKeyValuePartitionedTable(inputData, newDefinition, parentDir.getAbsolutePath(), writeInstructions); - final Table expected = inputData.dropColumns("PC2", "NPC6").updateView("NPC15 = (int)null"); - fromDisk = readKeyValuePartitionedTable(parentDir, EMPTY).select(); - assertTableEquals(expected.sort("PC1"), fromDisk.sort("PC1")); - fromDiskWithMetadata = readTable(new File(parentDir, "_common_metadata")); - assertTableEquals(expected.sort("PC1"), fromDiskWithMetadata.sort("PC1")); + final Table expected = bigDecimalFixedInputData.dropColumns("PC2", "NPC6").updateView("NPC15 = (int)null"); + { + writeKeyValuePartitionedTable(inputData, newDefinition, parentDir.getAbsolutePath(), writeInstructions); + final Table fromDisk = readKeyValuePartitionedTable(parentDir, EMPTY).select(); + assertTableEquals(expected.sort("PC1"), fromDisk.sort("PC1")); + final Table fromDiskWithMetadata = readTable(new File(parentDir, "_common_metadata")); + assertTableEquals(expected.sort("PC1"), fromDiskWithMetadata.sort("PC1")); + FileUtils.deleteRecursively(parentDir); + } - FileUtils.deleteRecursively(parentDir); + // Next API to test takes table with updated definition and additional indexing columns + { + writeKeyValuePartitionedTable(inputData, newDefinition, parentDir.getAbsolutePath(), writeInstructions, + indexColumns); + assertFalse(DataIndexer.hasDataIndex(inputData, indexColumn)); + final Table fromDisk = readKeyValuePartitionedTable(parentDir, EMPTY); + verifyIndexingInfoExists(fromDisk, indexColumn); + assertTableEquals(expected.sort("PC1"), fromDisk.sort("PC1")); + final Table fromDiskWithMetadata = readTable(new File(parentDir, "_common_metadata")); + assertTableEquals(expected.sort("PC1"), fromDiskWithMetadata.sort("PC1")); + FileUtils.deleteRecursively(parentDir); + } // Next API we test is passing the partitioned table with an updated table definition - writeKeyValuePartitionedTable(partitionedTable, newDefinition, parentDir.getPath(), writeInstructions); - fromDisk = readKeyValuePartitionedTable(parentDir, EMPTY).select(); - assertTableEquals(expected.sort("PC1"), fromDisk.sort("PC1")); - fromDiskWithMetadata = readTable(new File(parentDir, "_common_metadata")); - assertTableEquals(expected.sort("PC1"), fromDiskWithMetadata.sort("PC1")); + { + writeKeyValuePartitionedTable(partitionedTable, newDefinition, parentDir.getPath(), writeInstructions); + final Table fromDisk = readKeyValuePartitionedTable(parentDir, EMPTY).select(); + assertTableEquals(expected.sort("PC1"), fromDisk.sort("PC1")); + final Table fromDiskWithMetadata = readTable(new File(parentDir, "_common_metadata")); + assertTableEquals(expected.sort("PC1"), fromDiskWithMetadata.sort("PC1")); + FileUtils.deleteRecursively(parentDir); + } + + // Next API we test is passing the indexing columns with partitioned table and an updated table definition + { + writeKeyValuePartitionedTable(partitionedTable, newDefinition, parentDir.getPath(), writeInstructions, + indexColumns); + final Table fromDisk = readKeyValuePartitionedTable(parentDir, EMPTY); + verifyIndexingInfoExists(fromDisk, "NPC5"); + assertTableEquals(expected.sort("PC1"), fromDisk.sort("PC1")); + final Table fromDiskWithMetadata = readTable(new File(parentDir, "_common_metadata")); + assertTableEquals(expected.sort("PC1"), fromDiskWithMetadata.sort("PC1")); + FileUtils.deleteRecursively(parentDir); + } }