Skip to content

Commit

Permalink
[HUDI-7559] [1/n] Fix RecordLevelIndexSupport::filterQueryWithRecordK…
Browse files Browse the repository at this point in the history
…ey (#10947)

RecordLevelIndexSupport::filterQueryWithRecordKey() throws a NPE if the EqualTo
query predicate is not of the form `AttributeReference = Literal`. This is because
RecordLevelIndexSupport:::getAttributeLiteralTuple() returns null in such cases which
is then derefercend unconditionally.

This bug was rendering the functional index to not be used even when the query predicate
had spark functions on which functional index is built. Hence these column-stats based functional index
was not pruning files.

This PR makes the following minor changes.
1. Move some methods in RecordLevelIndexSupport into an object to make it static (to aid in unit testing)
2. Fix filterQueryWithRecordKey() by checking for null return values from the call to getAttributeLiteralTuple
3. Add unit tests in TestRecordLevelIndexSupport.scala

Co-authored-by: Vinaykumar Bhat <vinay@onehouse.ai>
  • Loading branch information
2 people authored and yihua committed May 14, 2024
1 parent 140e483 commit a462742
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,46 +76,6 @@ class RecordLevelIndexSupport(spark: SparkSession,
Option.apply(recordKeyOpt.orElse(null))
}

/**
* Matches the configured simple record key with the input attribute name.
* @param attributeName The attribute name provided in the query
* @return true if input attribute name matches the configured simple record key
*/
private def attributeMatchesRecordKey(attributeName: String): Boolean = {
val recordKeyOpt = getRecordKeyConfig
if (recordKeyOpt.isDefined && recordKeyOpt.get == attributeName) {
true
} else {
HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName == recordKeyOpt.get
}
}

/**
* Returns the attribute and literal pair given the operands of a binary operator. The pair is returned only if one of
* the operand is an attribute and other is literal. In other cases it returns an empty Option.
* @param expression1 - Left operand of the binary operator
* @param expression2 - Right operand of the binary operator
* @return Attribute and literal pair
*/
private def getAttributeLiteralTuple(expression1: Expression, expression2: Expression): Option[(AttributeReference, Literal)] = {
expression1 match {
case attr: AttributeReference => expression2 match {
case literal: Literal =>
Option.apply(attr, literal)
case _ =>
Option.empty
}
case literal: Literal => expression2 match {
case attr: AttributeReference =>
Option.apply(attr, literal)
case _ =>
Option.empty
}
case _ => Option.empty
}

}

/**
* Given query filters, it filters the EqualTo and IN queries on simple record key columns and returns a tuple of
* list of such queries and list of record key literals present in the query.
Expand All @@ -130,7 +90,8 @@ class RecordLevelIndexSupport(spark: SparkSession,
var recordKeyQueries: List[Expression] = List.empty
var recordKeys: List[String] = List.empty
for (query <- queryFilters) {
filterQueryWithRecordKey(query).foreach({
val recordKeyOpt = getRecordKeyConfig
RecordLevelIndexSupport.filterQueryWithRecordKey(query, recordKeyOpt).foreach({
case (exp: Expression, recKeys: List[String]) =>
recordKeys = recordKeys ++ recKeys
recordKeyQueries = recordKeyQueries :+ exp
Expand All @@ -141,27 +102,43 @@ class RecordLevelIndexSupport(spark: SparkSession,
}
}

/**
* Return true if metadata table is enabled and record index metadata partition is available.
*/
def isIndexAvailable: Boolean = {
metadataConfig.enabled && metaClient.getTableConfig.getMetadataPartitions.contains(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX)
}
}

object RecordLevelIndexSupport {
/**
* If the input query is an EqualTo or IN query on simple record key columns, the function returns a tuple of
* list of the query and list of record key literals present in the query otherwise returns an empty option.
*
* @param queryFilter The query that need to be filtered.
* @return Tuple of filtered query and list of record key literals that need to be matched
*/
private def filterQueryWithRecordKey(queryFilter: Expression): Option[(Expression, List[String])] = {
def filterQueryWithRecordKey(queryFilter: Expression, recordKeyOpt: Option[String]): Option[(Expression, List[String])] = {
queryFilter match {
case equalToQuery: EqualTo =>
val (attribute, literal) = getAttributeLiteralTuple(equalToQuery.left, equalToQuery.right).orNull
if (attribute != null && attribute.name != null && attributeMatchesRecordKey(attribute.name)) {
Option.apply(equalToQuery, List.apply(literal.value.toString))
val attributeLiteralTuple = getAttributeLiteralTuple(equalToQuery.left, equalToQuery.right).orNull
if (attributeLiteralTuple != null) {
val attribute = attributeLiteralTuple._1
val literal = attributeLiteralTuple._2
if (attribute != null && attribute.name != null && attributeMatchesRecordKey(attribute.name, recordKeyOpt)) {
Option.apply(equalToQuery, List.apply(literal.value.toString))
} else {
Option.empty
}
} else {
Option.empty
}

case inQuery: In =>
var validINQuery = true
inQuery.value match {
case attribute: AttributeReference =>
if (!attributeMatchesRecordKey(attribute.name)) {
if (!attributeMatchesRecordKey(attribute.name, recordKeyOpt)) {
validINQuery = false
}
case _ => validINQuery = false
Expand All @@ -181,9 +158,40 @@ class RecordLevelIndexSupport(spark: SparkSession,
}

/**
* Return true if metadata table is enabled and record index metadata partition is available.
* Returns the attribute and literal pair given the operands of a binary operator. The pair is returned only if one of
* the operand is an attribute and other is literal. In other cases it returns an empty Option.
* @param expression1 - Left operand of the binary operator
* @param expression2 - Right operand of the binary operator
* @return Attribute and literal pair
*/
def isIndexAvailable: Boolean = {
metadataConfig.enabled && metaClient.getTableConfig.getMetadataPartitions.contains(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX)
private def getAttributeLiteralTuple(expression1: Expression, expression2: Expression): Option[(AttributeReference, Literal)] = {
expression1 match {
case attr: AttributeReference => expression2 match {
case literal: Literal =>
Option.apply(attr, literal)
case _ =>
Option.empty
}
case literal: Literal => expression2 match {
case attr: AttributeReference =>
Option.apply(attr, literal)
case _ =>
Option.empty
}
case _ => Option.empty
}
}

/**
* Matches the configured simple record key with the input attribute name.
* @param attributeName The attribute name provided in the query
* @return true if input attribute name matches the configured simple record key
*/
private def attributeMatchesRecordKey(attributeName: String, recordKeyOpt: Option[String]): Boolean = {
if (recordKeyOpt.isDefined && recordKeyOpt.get == attributeName) {
true
} else {
HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName == recordKeyOpt.get
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi

import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Expression, FromUnixTime, GreaterThan, In, Literal, Not}
import org.apache.spark.sql.types.StringType
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.Test

import java.util.TimeZone

class TestRecordLevelIndexSupport {
@Test
def testFilterQueryWithRecordKey(): Unit = {
// Case 1: EqualTo filters not on simple AttributeReference and non-Literal should return empty result
val fmt = "yyyy-MM-dd HH:mm:ss"
val fromUnixTime = FromUnixTime(Literal(0L), Literal(fmt), Some(TimeZone.getDefault.getID))
var testFilter: Expression = EqualTo(fromUnixTime, Literal("2020-01-01 00:10:20"))
var result = RecordLevelIndexSupport.filterQueryWithRecordKey(testFilter, Option.empty)
assertTrue(result.isEmpty)

// Case 2: EqualTo filters not on Literal and not on simple AttributeReference should return empty result
testFilter = EqualTo(Literal("2020-01-01 00:10:20"), fromUnixTime)
result = RecordLevelIndexSupport.filterQueryWithRecordKey(testFilter, Option.empty)
assertTrue(result.isEmpty)

// Case 3: EqualTo filters on simple AttributeReference and non-Literal should return empty result
testFilter = EqualTo(AttributeReference("_row_key", StringType, nullable = true)(), fromUnixTime)
result = RecordLevelIndexSupport.filterQueryWithRecordKey(testFilter, Option.empty)
assertTrue(result.isEmpty)

// Case 4: EqualTo filters on simple AttributeReference and Literal which should return non-empty result
testFilter = EqualTo(AttributeReference("_row_key", StringType, nullable = true)(), Literal("row1"))
result = RecordLevelIndexSupport.filterQueryWithRecordKey(testFilter, Option.apply(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName))
assertTrue(result.isDefined)
assertEquals(result, Option.apply(testFilter, List.apply("row1")))

// case 5: EqualTo on fields other than record key should return empty result
result = RecordLevelIndexSupport.filterQueryWithRecordKey(testFilter, Option.apply("blah"))
assertTrue(result.isEmpty)

// Case 6: In filter on fields other than record key should return empty result
testFilter = In(AttributeReference("_row_key", StringType, nullable = true)(), List.apply(Literal("xyz"), Literal("abc")))
result = RecordLevelIndexSupport.filterQueryWithRecordKey(testFilter, Option.apply("blah"))
assertTrue(result.isEmpty)

// Case 7: In filter on record key should return non-empty result
testFilter = In(AttributeReference("_row_key", StringType, nullable = true)(), List.apply(Literal("xyz"), Literal("abc")))
result = RecordLevelIndexSupport.filterQueryWithRecordKey(testFilter, Option.apply(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName))
assertTrue(result.isDefined)

// Case 8: In filter on simple AttributeReference(on record-key) and non-Literal should return empty result
testFilter = In(AttributeReference("_row_key", StringType, nullable = true)(), List.apply(fromUnixTime))
result = RecordLevelIndexSupport.filterQueryWithRecordKey(testFilter, Option.apply(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName))
assertTrue(result.isEmpty)

// Case 9: Anything other than EqualTo and In predicate is not supported. Hence it returns empty result
testFilter = Not(In(AttributeReference("_row_key", StringType, nullable = true)(), List.apply(Literal("xyz"), Literal("abc"))))
result = RecordLevelIndexSupport.filterQueryWithRecordKey(testFilter, Option.apply(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName))
assertTrue(result.isEmpty)

testFilter = Not(In(AttributeReference("_row_key", StringType, nullable = true)(), List.apply(fromUnixTime)))
result = RecordLevelIndexSupport.filterQueryWithRecordKey(testFilter, Option.apply(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName))
assertTrue(result.isEmpty)

testFilter = GreaterThan(AttributeReference("_row_key", StringType, nullable = true)(), Literal("row1"))
result = RecordLevelIndexSupport.filterQueryWithRecordKey(testFilter, Option.apply(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName))
assertTrue(result.isEmpty)
}
}

0 comments on commit a462742

Please sign in to comment.