Skip to content

Commit

Permalink
Databricks 9.1 runtime has changed the API clipParquetSchema which re…
Browse files Browse the repository at this point in the history
…turns

the readSchema-same-name schema when case insensitive, which will cause
clipBlocks return in-correct results since clipBlocks only takes care of
case sensitive matching.

Signed-off-by: Bobby Wang wbo4958@gmail.com

To fix NVIDIA#4069
  • Loading branch information
NvTimLiu committed Nov 11, 2021
1 parent 0bf97aa commit 7768e9c
Showing 1 changed file with 16 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,8 @@ private case class GpuParquetFileFilterHandler(@transient sqlConf: SQLConf) exte
val clippedSchema = GpuParquetPartitionReaderFactoryBase.filterClippedSchema(clippedSchemaTmp,
fileSchema, isCaseSensitive)
val columnPaths = clippedSchema.getPaths.asScala.map(x => ColumnPath.get(x: _*))
val clipped = ParquetPartitionReader.clipBlocks(columnPaths, blocks.asScala)
val clipped = ParquetPartitionReader.clipBlocks(columnPaths, blocks.asScala, isCaseSensitive)

ParquetFileInfoWithBlockMeta(filePath, clipped, file.partitionValues,
clippedSchema, isCorrectedInt96RebaseForThisFile, isCorrectedRebaseForThisFile,
hasInt96Timestamps)
Expand Down Expand Up @@ -1545,16 +1546,26 @@ object ParquetPartitionReader {
*
* @param columnPaths the paths of columns to preserve
* @param blocks the block metadata from the original Parquet file
* @param isCaseSensitive indicate if it is case sensitive
* @return the updated block metadata with undesired column chunks removed
*/
private[spark] def clipBlocks(columnPaths: Seq[ColumnPath],
blocks: Seq[BlockMetaData]): Seq[BlockMetaData] = {
val pathSet = columnPaths.toSet
blocks: Seq[BlockMetaData], isCaseSensitive: Boolean): Seq[BlockMetaData] = {
val pathSet = if (isCaseSensitive) {
columnPaths.map(cp => cp.toDotString).toSet
} else {
columnPaths.map(cp => cp.toDotString.toLowerCase(Locale.ROOT)).toSet
}
blocks.map(oldBlock => {
//noinspection ScalaDeprecation
val newColumns = oldBlock.getColumns.asScala.filter(c => pathSet.contains(c.getPath))
val newColumns = if (isCaseSensitive) {
oldBlock.getColumns.asScala.filter(c =>
pathSet.contains(c.getPath.toDotString))
} else {
oldBlock.getColumns.asScala.filter(c =>
pathSet.contains(c.getPath.toDotString.toLowerCase(Locale.ROOT)))
}
ParquetPartitionReader.newParquetBlock(oldBlock.getRowCount, newColumns)
})
}
}

0 comments on commit 7768e9c

Please sign in to comment.