Skip to content

Commit

Permalink
[SPARK-38159][SQL] Add a new FileSourceMetadataAttribute for the Hidd…
Browse files Browse the repository at this point in the history
…en File Metadata

### What changes were proposed in this pull request?
Add a new `FileSourceMetadataAttribute` object with an `apply` method to create a `FileSourceMetadataAttribute`, and an `unapply` method to match only file source metadata attribute.

### Why are the changes needed?
Extra safeguard to make sure it matches file source metadata attribute.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing UTs

Closes #35459 from Yaohua628/spark-38159.

Authored-by: yaohua <yaohua.zhao@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
Yaohua628 authored and cloud-fan committed Feb 11, 2022
1 parent 93251ed commit 7ed51bb
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -434,8 +434,8 @@ object VirtualColumn {
}

/**
* The internal representation of the hidden metadata struct:
* set `__metadata_col` to `true` in AttributeReference metadata
* The internal representation of the MetadataAttribute,
* it sets `__metadata_col` to `true` in AttributeReference metadata
* - apply() will create a metadata attribute reference
* - unapply() will check if an attribute reference is the metadata attribute reference
*/
Expand All @@ -451,3 +451,28 @@ object MetadataAttribute {
} else None
}
}

/**
* The internal representation of the FileSourceMetadataAttribute, it sets `__metadata_col`
* and `__file_source_metadata_col` to `true` in AttributeReference's metadata
* - apply() will create a file source metadata attribute reference
* - unapply() will check if an attribute reference is the file source metadata attribute reference
*/
object FileSourceMetadataAttribute {

val FILE_SOURCE_METADATA_COL_ATTR_KEY = "__file_source_metadata_col"

def apply(name: String, dataType: DataType, nullable: Boolean = true): AttributeReference =
AttributeReference(name, dataType, nullable,
new MetadataBuilder()
.putBoolean(METADATA_COL_ATTR_KEY, value = true)
.putBoolean(FILE_SOURCE_METADATA_COL_ATTR_KEY, value = true).build())()

def unapply(attr: AttributeReference): Option[AttributeReference] =
attr match {
case MetadataAttribute(attr)
if attr.metadata.contains(FILE_SOURCE_METADATA_COL_ATTR_KEY)
&& attr.metadata.getBoolean(FILE_SOURCE_METADATA_COL_ATTR_KEY) => Some(attr)
case _ => None
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ case class FileSourceScanExec(
extends DataSourceScanExec {

lazy val metadataColumns: Seq[AttributeReference] =
output.collect { case MetadataAttribute(attr) => attr }
output.collect { case FileSourceMetadataAttribute(attr) => attr }

// Note that some vals referring the file-based relation are lazy intentionally
// so that this plan can be canonicalized on executor side too. See SPARK-23731.
Expand Down Expand Up @@ -366,9 +366,11 @@ case class FileSourceScanExec(
@transient
private lazy val pushedDownFilters = {
val supportNestedPredicatePushdown = DataSourceUtils.supportNestedPredicatePushdown(relation)
// TODO: should be able to push filters containing metadata columns down to skip files
// `dataFilters` should not include any metadata col filters
// because the metadata struct has been flatted in FileSourceStrategy
// and thus metadata col filters are invalid to be pushed down
dataFilters.filterNot(_.references.exists {
case MetadataAttribute(_) => true
case FileSourceMetadataAttribute(_) => true
case _ => false
}).flatMap(DataSourceStrategy.translateFilter(_, supportNestedPredicatePushdown))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ object FileFormat {
.add(StructField(FILE_MODIFICATION_TIME, TimestampType))

// create a file metadata struct col
def createFileMetadataCol: AttributeReference = MetadataAttribute(METADATA_NAME, METADATA_STRUCT)
def createFileMetadataCol: AttributeReference =
FileSourceMetadataAttribute(METADATA_NAME, METADATA_STRUCT)

// create an internal row given required metadata fields and file information
def createMetadataInternalRow(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,12 +214,12 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging {
logInfo(s"Output Data Schema: ${outputSchema.simpleString(5)}")

val metadataStructOpt = l.output.collectFirst {
case MetadataAttribute(attr) => attr
case FileSourceMetadataAttribute(attr) => attr
}

val metadataColumns = metadataStructOpt.map { metadataStruct =>
metadataStruct.dataType.asInstanceOf[StructType].fields.map { field =>
MetadataAttribute(field.name, field.dataType)
FileSourceMetadataAttribute(field.name, field.dataType)
}.toSeq
}.getOrElse(Seq.empty)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ abstract class PartitioningAwareFileIndex(

// retrieve the file metadata filters and reduce to a final filter expression
val fileMetadataFilterOpt = dataFilters.filter(_.references.forall {
case MetadataAttribute(_) => true
case FileSourceMetadataAttribute(_) => true
case _ => false
}).reduceOption(expressions.And)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ object SchemaPruning extends Rule[LogicalPlan] {
}

val metadataSchema =
relation.output.collect { case MetadataAttribute(attr) => attr }.toStructType
relation.output.collect { case FileSourceMetadataAttribute(attr) => attr }.toStructType
val prunedMetadataSchema = if (metadataSchema.nonEmpty) {
pruneSchema(metadataSchema, requestedRootFields)
} else {
Expand Down

0 comments on commit 7ed51bb

Please sign in to comment.