Skip to content

Commit

Permalink
Added new APIs for managing indexes
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam committed Mar 25, 2024
1 parent f68551f commit b11ebd1
Show file tree
Hide file tree
Showing 2 changed files with 243 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.deephaven.base.Pair;
import io.deephaven.base.verify.Require;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.liveness.LivenessScopeStack;
import io.deephaven.engine.primitive.iterator.CloseableIterator;
import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.ColumnSource;
Expand All @@ -22,6 +23,7 @@
import io.deephaven.engine.updategraph.UpdateSourceRegistrar;
import io.deephaven.parquet.base.ParquetMetadataFileWriter;
import io.deephaven.parquet.base.NullParquetMetadataFileWriter;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.channel.SeekableChannelsProvider;
import io.deephaven.util.channel.SeekableChannelsProviderLoader;
import io.deephaven.util.channel.SeekableChannelsProviderPlugin;
Expand Down Expand Up @@ -88,6 +90,7 @@
public class ParquetTools {

private static final int MAX_PARTITIONING_LEVELS_INFERENCE = 32;
private static final String[][] EMPTY_INDEXES = new String[0][];

private ParquetTools() {}

Expand Down Expand Up @@ -506,7 +509,9 @@ private static List<ParquetTableWriter.IndexWritingInfo> indexInfoBuilderHelper(
* written as "key=value" format in a nested directory structure. To generate these individual partitions, this
* method will call {@link Table#partitionBy(String...) partitionBy} on all the partitioning columns of provided
* table. The generated parquet files will have names of the format provided by
* {@link ParquetInstructions#baseNameForPartitionedParquetData()}.
* {@link ParquetInstructions#baseNameForPartitionedParquetData()}. Any indexing columns present on the source table
* will be written as sidecar tables. To write only a subset of the indexes or add additional indexes while writing,
* use {@link #writeKeyValuePartitionedTable(Table, String, ParquetInstructions, String[][])}.
*
* @param sourceTable The table to partition and write
* @param destinationDir The path to destination root directory to store partitioned data in nested format.
Expand All @@ -517,15 +522,41 @@ public static void writeKeyValuePartitionedTable(@NotNull final Table sourceTabl
@NotNull final String destinationDir,
@NotNull final ParquetInstructions writeInstructions) {
writeKeyValuePartitionedTable(sourceTable, sourceTable.getDefinition(), destinationDir,
writeInstructions);
writeInstructions, indexedColumnNames(sourceTable));
}

/**
* Write table to disk in parquet format with {@link TableDefinition#getPartitioningColumns() partitioning columns}
* written as "key=value" format in a nested directory structure. To generate these individual partitions, this
* method will call {@link Table#partitionBy(String...) partitionBy} on all the partitioning columns of provided
* table. The generated parquet files will have names of the format provided by
* {@link ParquetInstructions#baseNameForPartitionedParquetData()}.
*
* @param sourceTable The table to partition and write
* @param destinationDir The path to destination root directory to store partitioned data in nested format.
* Non-existing directories are created.
* @param writeInstructions Write instructions for customizations while writing
* @param indexColumnArr Arrays containing the column names for indexes to persist. The write operation will store
* the index info as sidecar tables. This argument is used to narrow the set of indexes to write, or to be
* explicit about the expected set of indexes present on all sources. Indexes that are specified but missing
* will be computed on demand.
*/
public static void writeKeyValuePartitionedTable(@NotNull final Table sourceTable,
@NotNull final String destinationDir,
@NotNull final ParquetInstructions writeInstructions,
@Nullable final String[][] indexColumnArr) {
writeKeyValuePartitionedTable(sourceTable, sourceTable.getDefinition(), destinationDir,
writeInstructions, indexColumnArr);
}

/**
* Write table to disk in parquet format with {@link TableDefinition#getPartitioningColumns() partitioning columns}
* written as "key=value" format in a nested directory structure. To generate these individual partitions, this
* method will call {@link Table#partitionBy(String...) partitionBy} on all the partitioning columns in the provided
* table definition. The generated parquet files will have names of the format provided by
* {@link ParquetInstructions#baseNameForPartitionedParquetData()}.
* {@link ParquetInstructions#baseNameForPartitionedParquetData()}. Any indexing columns present on the source table
* will be written as sidecar tables. To write only a subset of the indexes or add additional indexes while writing,
* use {@link #writeKeyValuePartitionedTable(Table, TableDefinition, String, ParquetInstructions, String[][])}.
*
* @param sourceTable The table to partition and write
* @param definition table definition to use (instead of the one implied by the table itself)
Expand All @@ -537,6 +568,33 @@ public static void writeKeyValuePartitionedTable(@NotNull final Table sourceTabl
@NotNull final TableDefinition definition,
@NotNull final String destinationDir,
@NotNull final ParquetInstructions writeInstructions) {
writeKeyValuePartitionedTable(sourceTable, definition, destinationDir, writeInstructions,
indexedColumnNames(sourceTable));

}

/**
* Write table to disk in parquet format with {@link TableDefinition#getPartitioningColumns() partitioning columns}
* written as "key=value" format in a nested directory structure. To generate these individual partitions, this
* method will call {@link Table#partitionBy(String...) partitionBy} on all the partitioning columns in the provided
* table definition. The generated parquet files will have names of the format provided by
* {@link ParquetInstructions#baseNameForPartitionedParquetData()}.
*
* @param sourceTable The table to partition and write
* @param definition table definition to use (instead of the one implied by the table itself)
* @param destinationDir The path to destination root directory to store partitioned data in nested format.
* Non-existing directories are created.
* @param writeInstructions Write instructions for customizations while writing
* @param indexColumnArr Arrays containing the column names for indexes to persist. The write operation will store
* the index info as sidecar tables. This argument is used to narrow the set of indexes to write, or to be
* explicit about the expected set of indexes present on all sources. Indexes that are specified but missing
* will be computed on demand.
*/
public static void writeKeyValuePartitionedTable(@NotNull final Table sourceTable,
@NotNull final TableDefinition definition,
@NotNull final String destinationDir,
@NotNull final ParquetInstructions writeInstructions,
@Nullable final String[][] indexColumnArr) {
final List<ColumnDefinition<?>> partitioningColumns = definition.getPartitioningColumns();
if (partitioningColumns.isEmpty()) {
throw new IllegalArgumentException("Table must have partitioning columns to write partitioned data");
Expand All @@ -549,7 +607,7 @@ public static void writeKeyValuePartitionedTable(@NotNull final Table sourceTabl
final TableDefinition leafDefinition =
getNonKeyTableDefinition(new HashSet<>(Arrays.asList(partitioningColNames)), definition);
writeKeyValuePartitionedTableImpl(partitionedTable, keyTableDefinition, leafDefinition, destinationDir,
writeInstructions, Optional.of(sourceTable));
writeInstructions, indexColumnArr, Optional.of(sourceTable));
}

/**
Expand All @@ -566,12 +624,34 @@ public static void writeKeyValuePartitionedTable(@NotNull final Table sourceTabl
public static void writeKeyValuePartitionedTable(@NotNull final PartitionedTable partitionedTable,
@NotNull final String destinationDir,
@NotNull final ParquetInstructions writeInstructions) {
writeKeyValuePartitionedTable(partitionedTable, destinationDir, writeInstructions, EMPTY_INDEXES);
}

/**
* Write a partitioned table to disk in parquet format with all the {@link PartitionedTable#keyColumnNames() key
* columns} as "key=value" format in a nested directory structure. To generate the partitioned table, users can call
* {@link Table#partitionBy(String...) partitionBy} on the required columns. The generated parquet files will have
* names of the format provided by {@link ParquetInstructions#baseNameForPartitionedParquetData()}.
*
* @param partitionedTable The partitioned table to write
* @param destinationDir The path to destination root directory to store partitioned data in nested format.
* Non-existing directories are created.
* @param writeInstructions Write instructions for customizations while writing
* @param indexColumnArr Arrays containing the column names for indexes to persist. The write operation will store
* the index info as sidecar tables. This argument is used to narrow the set of indexes to write, or to be
* explicit about the expected set of indexes present on all sources. Indexes that are specified but missing
* will be computed on demand.
*/
public static void writeKeyValuePartitionedTable(@NotNull final PartitionedTable partitionedTable,
@NotNull final String destinationDir,
@NotNull final ParquetInstructions writeInstructions,
@Nullable final String[][] indexColumnArr) {
final TableDefinition keyTableDefinition = getKeyTableDefinition(partitionedTable.keyColumnNames(),
partitionedTable.table().getDefinition());
final TableDefinition leafDefinition = getNonKeyTableDefinition(partitionedTable.keyColumnNames(),
partitionedTable.constituentDefinition());
writeKeyValuePartitionedTableImpl(partitionedTable, keyTableDefinition, leafDefinition, destinationDir,
writeInstructions, Optional.empty());
writeInstructions, indexColumnArr, Optional.empty());
}

/**
Expand All @@ -590,10 +670,34 @@ public static void writeKeyValuePartitionedTable(@NotNull final PartitionedTable
@NotNull final TableDefinition definition,
@NotNull final String destinationDir,
@NotNull final ParquetInstructions writeInstructions) {
writeKeyValuePartitionedTable(partitionedTable, definition, destinationDir, writeInstructions, EMPTY_INDEXES);
}

/**
* Write a partitioned table to disk in parquet format with all the {@link PartitionedTable#keyColumnNames() key
* columns} as "key=value" format in a nested directory structure. To generate the partitioned table, users can call
* {@link Table#partitionBy(String...) partitionBy} on the required columns. The generated parquet files will have
* names of the format provided by {@link ParquetInstructions#baseNameForPartitionedParquetData()}.
*
* @param partitionedTable The partitioned table to write
* @param definition table definition to use (instead of the one implied by the table itself)
* @param destinationDir The path to destination root directory to store partitioned data in nested format.
* Non-existing directories are created.
* @param writeInstructions Write instructions for customizations while writing
* @param indexColumnArr Arrays containing the column names for indexes to persist. The write operation will store
* the index info as sidecar tables. This argument is used to narrow the set of indexes to write, or to be
* explicit about the expected set of indexes present on all sources. Indexes that are specified but missing
* will be computed on demand.
*/
public static void writeKeyValuePartitionedTable(@NotNull final PartitionedTable partitionedTable,
@NotNull final TableDefinition definition,
@NotNull final String destinationDir,
@NotNull final ParquetInstructions writeInstructions,
@NotNull final String[][] indexColumnArr) {
final TableDefinition keyTableDefinition = getKeyTableDefinition(partitionedTable.keyColumnNames(), definition);
final TableDefinition leafDefinition = getNonKeyTableDefinition(partitionedTable.keyColumnNames(), definition);
writeKeyValuePartitionedTableImpl(partitionedTable, keyTableDefinition, leafDefinition, destinationDir,
writeInstructions, Optional.empty());
writeInstructions, indexColumnArr, Optional.empty());
}

/**
Expand All @@ -605,6 +709,10 @@ public static void writeKeyValuePartitionedTable(@NotNull final PartitionedTable
* @param leafDefinition The definition for leaf parquet files to be written
* @param destinationRoot The path to destination root directory to store partitioned data in nested format
* @param writeInstructions Write instructions for customizations while writing
* @param indexColumnArr Arrays containing the column names for indexes to persist. The write operation will store
* the index info as sidecar tables. This argument is used to narrow the set of indexes to write, or to be
* explicit about the expected set of indexes present on all sources. Indexes that are specified but missing
* will be computed on demand.
* @param sourceTable The optional source table, provided when user provides a merged source table to write, like in
* {@link #writeKeyValuePartitionedTable(Table, String, ParquetInstructions)} and
* {@link #writeKeyValuePartitionedTable(Table, TableDefinition, String, ParquetInstructions)}
Expand All @@ -614,6 +722,7 @@ private static void writeKeyValuePartitionedTableImpl(@NotNull final Partitioned
@NotNull final TableDefinition leafDefinition,
@NotNull final String destinationRoot,
@NotNull final ParquetInstructions writeInstructions,
@Nullable final String[][] indexColumnArr,
@NotNull final Optional<Table> sourceTable) {
if (leafDefinition.numColumns() == 0) {
throw new IllegalArgumentException("Cannot write a partitioned parquet table without any non-partitioning "
Expand Down Expand Up @@ -692,10 +801,24 @@ private static void writeKeyValuePartitionedTableImpl(@NotNull final Partitioned
final Map<String, Map<ParquetCacheTags, Object>> computedCache =
buildComputedCache(() -> sourceTable.orElseGet(partitionedTable::merge), leafDefinition);
final Table[] partitionedDataArray = partitionedData.toArray(Table[]::new);
// TODO Verify correctness with Ryan/Larry
writeParquetTablesImpl(partitionedDataArray, leafDefinition, writeInstructions,
destinations.toArray(File[]::new), indexedColumnNames(partitionedDataArray), partitioningColumnsSchema,
new File(destinationRoot), computedCache);
try (final SafeCloseable ignored = LivenessScopeStack.open()) {
// TODO(deephaven-core#5292): Optimize creating index on constituent tables
addIndexesToTables(partitionedDataArray, indexColumnArr);
writeParquetTablesImpl(partitionedDataArray, leafDefinition, writeInstructions,
destinations.toArray(File[]::new), indexColumnArr, partitioningColumnsSchema,
new File(destinationRoot), computedCache);
}
}

private static void addIndexesToTables(@NotNull final Table[] tables,
@Nullable final String[][] indexColumnArr) {
if (indexColumnArr != null && indexColumnArr.length != 0) {
for (final Table table : tables) {
for (final String[] indexCols : indexColumnArr) {
DataIndexer.getOrCreateDataIndex(table, indexCols);
}
}
}
}

/**
Expand Down Expand Up @@ -939,22 +1062,32 @@ private static void writeParquetTablesImpl(@NotNull final Table[] sources,
* @implNote This only examines the first source table. The writing code will compute missing indexes for the other
* source tables.
*/
private static String[][] indexedColumnNames(@NotNull Table @NotNull [] sources) {
@NotNull
private static String[][] indexedColumnNames(@NotNull final Table @NotNull [] sources) {
if (sources.length == 0) {
return new String[0][];
return EMPTY_INDEXES;
}

// Use the first table as the source of indexed columns
final Table firstTable = sources[0];
final DataIndexer dataIndexer = DataIndexer.existingOf(firstTable.getRowSet());
return indexedColumnNames(sources[0]);
}

/**
* Examine the source table to retrieve the list of indexes as String[] arrays.
*
* @param source The table from which to retrieve the indexes
* @return An array containing the indexes as String[] arrays.
*/
@NotNull
private static String[][] indexedColumnNames(@NotNull final Table source) {
final DataIndexer dataIndexer = DataIndexer.existingOf(source.getRowSet());
if (dataIndexer == null) {
return new String[0][];
return EMPTY_INDEXES;
}
final List<DataIndex> dataIndexes = dataIndexer.dataIndexes(true);
if (dataIndexes.isEmpty()) {
return new String[0][];
return EMPTY_INDEXES;
}
final Map<String, ? extends ColumnSource<?>> nameToColumn = firstTable.getColumnSourceMap();
final Map<String, ? extends ColumnSource<?>> nameToColumn = source.getColumnSourceMap();
// We disregard collisions, here; any mapped name is an adequate choice.
final Map<ColumnSource<?>, String> columnToName = nameToColumn.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey));
Expand Down
Loading

0 comments on commit b11ebd1

Please sign in to comment.