From a462742835173d443533c2fac62f659a219c103a Mon Sep 17 00:00:00 2001 From: bhat-vinay <152183592+bhat-vinay@users.noreply.github.com> Date: Tue, 9 Apr 2024 19:14:42 +0530 Subject: [PATCH] [HUDI-7559] [1/n] Fix RecordLevelIndexSupport::filterQueryWithRecordKey (#10947) RecordLevelIndexSupport::filterQueryWithRecordKey() throws a NPE if the EqualTo query predicate is not of the form `AttributeReference = Literal`. This is because RecordLevelIndexSupport:::getAttributeLiteralTuple() returns null in such cases which is then derefercend unconditionally. This bug was rendering the functional index to not be used even when the query predicate had spark functions on which functional index is built. Hence these column-stats based functional index was not pruning files. This PR makes the following minor changes. 1. Move some methods in RecordLevelIndexSupport into an object to make it static (to aid in unit testing) 2. Fix filterQueryWithRecordKey() by checking for null return values from the call to getAttributeLiteralTuple 3. Add unit tests in TestRecordLevelIndexSupport.scala Co-authored-by: Vinaykumar Bhat --- .../apache/hudi/RecordLevelIndexSupport.scala | 106 ++++++++++-------- .../hudi/TestRecordLevelIndexSupport.scala | 88 +++++++++++++++ 2 files changed, 145 insertions(+), 49 deletions(-) create mode 100644 hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/TestRecordLevelIndexSupport.scala diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala index 3580e7ccfe8e9..3a0e3f78e9bc4 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala @@ -76,46 +76,6 @@ class RecordLevelIndexSupport(spark: SparkSession, Option.apply(recordKeyOpt.orElse(null)) } - /** - * Matches the configured simple record key with the input attribute name. - * @param attributeName The attribute name provided in the query - * @return true if input attribute name matches the configured simple record key - */ - private def attributeMatchesRecordKey(attributeName: String): Boolean = { - val recordKeyOpt = getRecordKeyConfig - if (recordKeyOpt.isDefined && recordKeyOpt.get == attributeName) { - true - } else { - HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName == recordKeyOpt.get - } - } - - /** - * Returns the attribute and literal pair given the operands of a binary operator. The pair is returned only if one of - * the operand is an attribute and other is literal. In other cases it returns an empty Option. - * @param expression1 - Left operand of the binary operator - * @param expression2 - Right operand of the binary operator - * @return Attribute and literal pair - */ - private def getAttributeLiteralTuple(expression1: Expression, expression2: Expression): Option[(AttributeReference, Literal)] = { - expression1 match { - case attr: AttributeReference => expression2 match { - case literal: Literal => - Option.apply(attr, literal) - case _ => - Option.empty - } - case literal: Literal => expression2 match { - case attr: AttributeReference => - Option.apply(attr, literal) - case _ => - Option.empty - } - case _ => Option.empty - } - - } - /** * Given query filters, it filters the EqualTo and IN queries on simple record key columns and returns a tuple of * list of such queries and list of record key literals present in the query. @@ -130,7 +90,8 @@ class RecordLevelIndexSupport(spark: SparkSession, var recordKeyQueries: List[Expression] = List.empty var recordKeys: List[String] = List.empty for (query <- queryFilters) { - filterQueryWithRecordKey(query).foreach({ + val recordKeyOpt = getRecordKeyConfig + RecordLevelIndexSupport.filterQueryWithRecordKey(query, recordKeyOpt).foreach({ case (exp: Expression, recKeys: List[String]) => recordKeys = recordKeys ++ recKeys recordKeyQueries = recordKeyQueries :+ exp @@ -141,6 +102,15 @@ class RecordLevelIndexSupport(spark: SparkSession, } } + /** + * Return true if metadata table is enabled and record index metadata partition is available. + */ + def isIndexAvailable: Boolean = { + metadataConfig.enabled && metaClient.getTableConfig.getMetadataPartitions.contains(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX) + } +} + +object RecordLevelIndexSupport { /** * If the input query is an EqualTo or IN query on simple record key columns, the function returns a tuple of * list of the query and list of record key literals present in the query otherwise returns an empty option. @@ -148,20 +118,27 @@ class RecordLevelIndexSupport(spark: SparkSession, * @param queryFilter The query that need to be filtered. * @return Tuple of filtered query and list of record key literals that need to be matched */ - private def filterQueryWithRecordKey(queryFilter: Expression): Option[(Expression, List[String])] = { + def filterQueryWithRecordKey(queryFilter: Expression, recordKeyOpt: Option[String]): Option[(Expression, List[String])] = { queryFilter match { case equalToQuery: EqualTo => - val (attribute, literal) = getAttributeLiteralTuple(equalToQuery.left, equalToQuery.right).orNull - if (attribute != null && attribute.name != null && attributeMatchesRecordKey(attribute.name)) { - Option.apply(equalToQuery, List.apply(literal.value.toString)) + val attributeLiteralTuple = getAttributeLiteralTuple(equalToQuery.left, equalToQuery.right).orNull + if (attributeLiteralTuple != null) { + val attribute = attributeLiteralTuple._1 + val literal = attributeLiteralTuple._2 + if (attribute != null && attribute.name != null && attributeMatchesRecordKey(attribute.name, recordKeyOpt)) { + Option.apply(equalToQuery, List.apply(literal.value.toString)) + } else { + Option.empty + } } else { Option.empty } + case inQuery: In => var validINQuery = true inQuery.value match { case attribute: AttributeReference => - if (!attributeMatchesRecordKey(attribute.name)) { + if (!attributeMatchesRecordKey(attribute.name, recordKeyOpt)) { validINQuery = false } case _ => validINQuery = false @@ -181,9 +158,40 @@ class RecordLevelIndexSupport(spark: SparkSession, } /** - * Return true if metadata table is enabled and record index metadata partition is available. + * Returns the attribute and literal pair given the operands of a binary operator. The pair is returned only if one of + * the operand is an attribute and other is literal. In other cases it returns an empty Option. + * @param expression1 - Left operand of the binary operator + * @param expression2 - Right operand of the binary operator + * @return Attribute and literal pair */ - def isIndexAvailable: Boolean = { - metadataConfig.enabled && metaClient.getTableConfig.getMetadataPartitions.contains(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX) + private def getAttributeLiteralTuple(expression1: Expression, expression2: Expression): Option[(AttributeReference, Literal)] = { + expression1 match { + case attr: AttributeReference => expression2 match { + case literal: Literal => + Option.apply(attr, literal) + case _ => + Option.empty + } + case literal: Literal => expression2 match { + case attr: AttributeReference => + Option.apply(attr, literal) + case _ => + Option.empty + } + case _ => Option.empty + } + } + + /** + * Matches the configured simple record key with the input attribute name. + * @param attributeName The attribute name provided in the query + * @return true if input attribute name matches the configured simple record key + */ + private def attributeMatchesRecordKey(attributeName: String, recordKeyOpt: Option[String]): Boolean = { + if (recordKeyOpt.isDefined && recordKeyOpt.get == attributeName) { + true + } else { + HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName == recordKeyOpt.get + } } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/TestRecordLevelIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/TestRecordLevelIndexSupport.scala new file mode 100644 index 0000000000000..d52af12880f33 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/TestRecordLevelIndexSupport.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi + +import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Expression, FromUnixTime, GreaterThan, In, Literal, Not} +import org.apache.spark.sql.types.StringType +import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} +import org.junit.jupiter.api.Test + +import java.util.TimeZone + +class TestRecordLevelIndexSupport { + @Test + def testFilterQueryWithRecordKey(): Unit = { + // Case 1: EqualTo filters not on simple AttributeReference and non-Literal should return empty result + val fmt = "yyyy-MM-dd HH:mm:ss" + val fromUnixTime = FromUnixTime(Literal(0L), Literal(fmt), Some(TimeZone.getDefault.getID)) + var testFilter: Expression = EqualTo(fromUnixTime, Literal("2020-01-01 00:10:20")) + var result = RecordLevelIndexSupport.filterQueryWithRecordKey(testFilter, Option.empty) + assertTrue(result.isEmpty) + + // Case 2: EqualTo filters not on Literal and not on simple AttributeReference should return empty result + testFilter = EqualTo(Literal("2020-01-01 00:10:20"), fromUnixTime) + result = RecordLevelIndexSupport.filterQueryWithRecordKey(testFilter, Option.empty) + assertTrue(result.isEmpty) + + // Case 3: EqualTo filters on simple AttributeReference and non-Literal should return empty result + testFilter = EqualTo(AttributeReference("_row_key", StringType, nullable = true)(), fromUnixTime) + result = RecordLevelIndexSupport.filterQueryWithRecordKey(testFilter, Option.empty) + assertTrue(result.isEmpty) + + // Case 4: EqualTo filters on simple AttributeReference and Literal which should return non-empty result + testFilter = EqualTo(AttributeReference("_row_key", StringType, nullable = true)(), Literal("row1")) + result = RecordLevelIndexSupport.filterQueryWithRecordKey(testFilter, Option.apply(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName)) + assertTrue(result.isDefined) + assertEquals(result, Option.apply(testFilter, List.apply("row1"))) + + // case 5: EqualTo on fields other than record key should return empty result + result = RecordLevelIndexSupport.filterQueryWithRecordKey(testFilter, Option.apply("blah")) + assertTrue(result.isEmpty) + + // Case 6: In filter on fields other than record key should return empty result + testFilter = In(AttributeReference("_row_key", StringType, nullable = true)(), List.apply(Literal("xyz"), Literal("abc"))) + result = RecordLevelIndexSupport.filterQueryWithRecordKey(testFilter, Option.apply("blah")) + assertTrue(result.isEmpty) + + // Case 7: In filter on record key should return non-empty result + testFilter = In(AttributeReference("_row_key", StringType, nullable = true)(), List.apply(Literal("xyz"), Literal("abc"))) + result = RecordLevelIndexSupport.filterQueryWithRecordKey(testFilter, Option.apply(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName)) + assertTrue(result.isDefined) + + // Case 8: In filter on simple AttributeReference(on record-key) and non-Literal should return empty result + testFilter = In(AttributeReference("_row_key", StringType, nullable = true)(), List.apply(fromUnixTime)) + result = RecordLevelIndexSupport.filterQueryWithRecordKey(testFilter, Option.apply(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName)) + assertTrue(result.isEmpty) + + // Case 9: Anything other than EqualTo and In predicate is not supported. Hence it returns empty result + testFilter = Not(In(AttributeReference("_row_key", StringType, nullable = true)(), List.apply(Literal("xyz"), Literal("abc")))) + result = RecordLevelIndexSupport.filterQueryWithRecordKey(testFilter, Option.apply(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName)) + assertTrue(result.isEmpty) + + testFilter = Not(In(AttributeReference("_row_key", StringType, nullable = true)(), List.apply(fromUnixTime))) + result = RecordLevelIndexSupport.filterQueryWithRecordKey(testFilter, Option.apply(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName)) + assertTrue(result.isEmpty) + + testFilter = GreaterThan(AttributeReference("_row_key", StringType, nullable = true)(), Literal("row1")) + result = RecordLevelIndexSupport.filterQueryWithRecordKey(testFilter, Option.apply(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName)) + assertTrue(result.isEmpty) + } +}