Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add in nvtx ranges for parquet filterBlocks #5073

Merged
merged 2 commits into from
Mar 28, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -366,56 +366,67 @@ private case class GpuParquetFileFilterHandler(@transient sqlConf: SQLConf) exte
conf : Configuration,
filters: Array[Filter],
readDataSchema: StructType): ParquetFileInfoWithBlockMeta = {
withResource(new NvtxRange("filterBlocks", NvtxColor.PURPLE)) { _ =>
val filePath = new Path(new URI(file.filePath))
//noinspection ScalaDeprecation
val footer = withResource(new NvtxRange("readFooter", NvtxColor.YELLOW)) { _ =>
ParquetFileReader.readFooter(conf, filePath,
ParquetMetadataConverter.range(file.start, file.start + file.length))
}
val fileSchema = footer.getFileMetaData.getSchema
val pushedFilters = if (enableParquetFilterPushDown) {
val parquetFilters = SparkShimImpl.getParquetFilters(fileSchema, pushDownDate,
pushDownTimestamp, pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold,
isCaseSensitive, footer.getFileMetaData.getKeyValueMetaData.get, rebaseMode)
filters.flatMap(parquetFilters.createFilter).reduceOption(FilterApi.and)
} else {
None
}

val filePath = new Path(new URI(file.filePath))
//noinspection ScalaDeprecation
val footer = ParquetFileReader.readFooter(conf, filePath,
ParquetMetadataConverter.range(file.start, file.start + file.length))
val fileSchema = footer.getFileMetaData.getSchema
val pushedFilters = if (enableParquetFilterPushDown) {
val parquetFilters = SparkShimImpl.getParquetFilters(fileSchema, pushDownDate,
pushDownTimestamp, pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold,
isCaseSensitive, footer.getFileMetaData.getKeyValueMetaData.get, rebaseMode)
filters.flatMap(parquetFilters.createFilter).reduceOption(FilterApi.and)
} else {
None
}

val hasInt96Timestamps = isParquetTimeInInt96(fileSchema)
val hasInt96Timestamps = isParquetTimeInInt96(fileSchema)

val isCorrectedRebaseForThisFile =
GpuParquetPartitionReaderFactoryBase.isCorrectedRebaseMode(
footer.getFileMetaData.getKeyValueMetaData.get, isCorrectedRebase)
val isCorrectedRebaseForThisFile =
GpuParquetPartitionReaderFactoryBase.isCorrectedRebaseMode(
footer.getFileMetaData.getKeyValueMetaData.get, isCorrectedRebase)

val isCorrectedInt96RebaseForThisFile =
GpuParquetPartitionReaderFactoryBase.isCorrectedInt96RebaseMode(
footer.getFileMetaData.getKeyValueMetaData.get, isInt96CorrectedRebase)
val isCorrectedInt96RebaseForThisFile =
GpuParquetPartitionReaderFactoryBase.isCorrectedInt96RebaseMode(
footer.getFileMetaData.getKeyValueMetaData.get, isInt96CorrectedRebase)

val blocks = if (pushedFilters.isDefined) {
// Use the ParquetFileReader to perform dictionary-level filtering
ParquetInputFormat.setFilterPredicate(conf, pushedFilters.get)
//noinspection ScalaDeprecation
withResource(new ParquetFileReader(conf, footer.getFileMetaData, filePath,
footer.getBlocks, Collections.emptyList[ColumnDescriptor])) { parquetReader =>
parquetReader.getRowGroups
val blocks = if (pushedFilters.isDefined) {
withResource(new NvtxRange("getBlocksWithFilter", NvtxColor.CYAN)) { _ =>
// Use the ParquetFileReader to perform dictionary-level filtering
ParquetInputFormat.setFilterPredicate(conf, pushedFilters.get)
//noinspection ScalaDeprecation
withResource(new ParquetFileReader(conf, footer.getFileMetaData, filePath,
footer.getBlocks, Collections.emptyList[ColumnDescriptor])) { parquetReader =>
parquetReader.getRowGroups
}
}
} else {
footer.getBlocks
}
} else {
footer.getBlocks
}

val clippedSchemaTmp = ParquetReadSupport.clipParquetSchema(fileSchema, readDataSchema,
isCaseSensitive)
// ParquetReadSupport.clipParquetSchema does most of what we want, but it includes
// everything in readDataSchema, even if it is not in fileSchema we want to remove those
// for our own purposes
val clippedSchema = GpuParquetPartitionReaderFactoryBase.filterClippedSchema(clippedSchemaTmp,
fileSchema, isCaseSensitive)
val columnPaths = clippedSchema.getPaths.asScala.map(x => ColumnPath.get(x: _*))
val clipped = ParquetPartitionReader.clipBlocks(columnPaths, blocks.asScala, isCaseSensitive)

ParquetFileInfoWithBlockMeta(filePath, clipped, file.partitionValues,
clippedSchema, isCorrectedInt96RebaseForThisFile, isCorrectedRebaseForThisFile,
hasInt96Timestamps)
val (clipped, clippedSchema) =
withResource(new NvtxRange("clipSchema", NvtxColor.DARK_GREEN)) { _ =>
val clippedSchemaTmp = ParquetReadSupport.clipParquetSchema(fileSchema, readDataSchema,
isCaseSensitive)
// ParquetReadSupport.clipParquetSchema does most of what we want, but it includes
// everything in readDataSchema, even if it is not in fileSchema we want to remove those
// for our own purposes
val clippedSchema =
GpuParquetPartitionReaderFactoryBase.filterClippedSchema(clippedSchemaTmp,
fileSchema, isCaseSensitive)
val columnPaths = clippedSchema.getPaths.asScala.map(x => ColumnPath.get(x: _*))
val clipped =
ParquetPartitionReader.clipBlocks(columnPaths, blocks.asScala, isCaseSensitive)
(clipped, clippedSchema)
}

ParquetFileInfoWithBlockMeta(filePath, clipped, file.partitionValues,
clippedSchema, isCorrectedInt96RebaseForThisFile, isCorrectedRebaseForThisFile,
hasInt96Timestamps)
}
}
}

Expand Down