From 1858fdc8e4c982d5f6c582d7250553c4240605fa Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 6 Jul 2023 12:41:24 -0700 Subject: [PATCH 1/4] Add UT for file index Signed-off-by: Chen Dai --- .../FlintSparkSkippingFileIndex.scala | 8 +-- .../FlintSparkSkippingFileIndexSuite.scala | 49 +++++++++++++++++++ .../spark/FlintSparkSkippingIndexSuite.scala | 2 +- 3 files changed, 54 insertions(+), 5 deletions(-) create mode 100644 flint/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndexSuite.scala diff --git a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala index 7df222d8ee..646363e23e 100644 --- a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala +++ b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala @@ -17,10 +17,10 @@ import org.apache.spark.sql.types.StructType * * @param baseFileIndex * original file index - * @param filterByIndex - * pushed down filtering on index data + * @param queryIndex + * query skipping index DF with pushed down filters */ -case class FlintSparkSkippingFileIndex(baseFileIndex: FileIndex, filterByIndex: DataFrame) +case class FlintSparkSkippingFileIndex(baseFileIndex: FileIndex, queryIndex: DataFrame) extends FileIndex { override def listFiles( @@ -28,7 +28,7 @@ case class FlintSparkSkippingFileIndex(baseFileIndex: FileIndex, filterByIndex: dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { val selectedFiles = - filterByIndex.collect + queryIndex.collect .map(_.getString(0)) .toSet diff --git a/flint/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndexSuite.scala b/flint/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndexSuite.scala new file mode 100644 index 0000000000..57f755e449 --- /dev/null +++ b/flint/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndexSuite.scala @@ -0,0 +1,49 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.skipping + +import org.apache.hadoop.fs.{FileStatus, Path} +import org.mockito.ArgumentMatchers.any +import org.mockito.Mockito.when +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.FILE_PATH_COLUMN +import org.scalatest.matchers.should.Matchers +import org.scalatestplus.mockito.MockitoSugar.mock + +import org.apache.spark.FlintSuite +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.execution.datasources.{FileIndex, PartitionDirectory} + +class FlintSparkSkippingFileIndexSuite extends FlintSuite with Matchers { + + test("") { + val baseFileIndex = mock[FileIndex] + when(baseFileIndex.listFiles(any(), any())) + .thenReturn(mockPartitions(Map("partition-1" -> Seq("filepath-1")))) + + val queryIndex = mockQueryIndexDf(Seq("filepath-1")) + + val fileIndex = FlintSparkSkippingFileIndex(baseFileIndex, queryIndex) + fileIndex.listFiles(Seq.empty, Seq.empty) shouldBe + mockPartitions(Map("partition-1" -> Seq("filepath-1"))) + } + + private def mockPartitions(partitions: Map[String, Seq[String]]): Seq[PartitionDirectory] = { + partitions.map { case (partitionName, filePaths) => + val files = filePaths.map(path => new FileStatus(0, false, 0, 0, 0, new Path(path))) + PartitionDirectory(InternalRow(Literal(partitionName)), files) + }.toSeq + } + + private def mockQueryIndexDf(filePaths: Seq[String]): DataFrame = { + val mySpark = spark + import mySpark.implicits._ + + val columns = Seq(FILE_PATH_COLUMN) + filePaths.toDF(columns: _*) + } +} diff --git a/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSuite.scala b/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSuite.scala index f8d6b67d8b..2128196376 100644 --- a/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSuite.scala +++ b/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSuite.scala @@ -333,7 +333,7 @@ class FlintSparkSkippingIndexSuite // Custom matcher to check if FlintSparkSkippingFileIndex has expected filter condition def hasIndexFilter(expect: Column): Matcher[FlintSparkSkippingFileIndex] = { Matcher { (fileIndex: FlintSparkSkippingFileIndex) => - val plan = fileIndex.filterByIndex.queryExecution.logical + val plan = fileIndex.queryIndex.queryExecution.logical val hasExpectedFilter = plan.find { case Filter(actual, _) => actual.semanticEquals(expect.expr) From 0dc0868e4c0fee195c905c227dca4a12ff375c3c Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Fri, 7 Jul 2023 10:50:56 -0700 Subject: [PATCH 2/4] Add hybrid scan config and IT Signed-off-by: Chen Dai --- .../sql/flint/config/FlintSparkConf.scala | 6 ++ .../ApplyFlintSparkSkippingIndex.scala | 22 +++--- .../FlintSparkSkippingFileIndex.scala | 47 +++++++++---- .../FlintSparkSkippingFileIndexSuite.scala | 70 +++++++++++++------ .../spark/FlintSparkSkippingIndexSuite.scala | 2 +- 5 files changed, 98 insertions(+), 49 deletions(-) diff --git a/flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala b/flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala index da171eb26e..efa89b1612 100644 --- a/flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala +++ b/flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala @@ -92,6 +92,10 @@ object FlintSparkConf { val OPTIMIZER_RULE_ENABLED = FlintConfig("spark.flint.optimizer.enabled") .doc("Enable Flint optimizer rule for query rewrite with Flint index") .createWithDefault("true") + + val HYBRID_SCAN_ENABLED = FlintConfig("spark.flint.index.hybridscan.enabled") + .doc("Enable hybrid scan to include latest source data not refreshed to index yet") + .createWithDefault("false") } /** @@ -114,6 +118,8 @@ class FlintSparkConf(properties: JMap[String, String]) extends Serializable { def isOptimizerEnabled: Boolean = OPTIMIZER_RULE_ENABLED.readFrom(reader).toBoolean + def isHybridScanEnabled: Boolean = HYBRID_SCAN_ENABLED.readFrom(reader).toBoolean + /** * spark.sql.session.timeZone */ diff --git a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala index dab01eefc3..b7ea2fa609 100644 --- a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala +++ b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala @@ -6,9 +6,9 @@ package org.opensearch.flint.spark.skipping import org.opensearch.flint.spark.FlintSpark -import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, FILE_PATH_COLUMN, SKIPPING_INDEX_TYPE} +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, SKIPPING_INDEX_TYPE} -import org.apache.spark.sql.{Column, DataFrame} +import org.apache.spark.sql.DataFrame import org.apache.spark.sql.catalyst.expressions.{And, Predicate} import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule @@ -38,7 +38,7 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan] val index = flint.describeIndex(indexName) if (index.exists(_.kind == SKIPPING_INDEX_TYPE)) { val skippingIndex = index.get.asInstanceOf[FlintSparkSkippingIndex] - val indexPred = rewriteToIndexPredicate(skippingIndex, condition) + val indexFilter = rewriteToIndexFilter(skippingIndex, condition) /* * Replace original file index with Flint skipping file index: @@ -47,9 +47,9 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan] * |- HadoopFsRelation * |- FileIndex <== replaced with FlintSkippingFileIndex */ - if (indexPred.isDefined) { - val filterByIndex = buildFilterIndexQuery(skippingIndex, indexPred.get) - val fileIndex = new FlintSparkSkippingFileIndex(location, filterByIndex) + if (indexFilter.isDefined) { + val indexScan = buildIndexScan(skippingIndex) + val fileIndex = FlintSparkSkippingFileIndex(location, indexScan, indexFilter.get) val indexRelation = baseRelation.copy(location = fileIndex)(baseRelation.sparkSession) filter.copy(child = relation.copy(relation = indexRelation)) } else { @@ -60,7 +60,7 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan] } } - private def rewriteToIndexPredicate( + private def rewriteToIndexFilter( index: FlintSparkSkippingIndex, condition: Predicate): Option[Predicate] = { @@ -71,15 +71,9 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan] .reduceOption(And(_, _)) } - private def buildFilterIndexQuery( - index: FlintSparkSkippingIndex, - rewrittenPredicate: Predicate): DataFrame = { - - // Get file list based on the rewritten predicates on index data + private def buildIndexScan(index: FlintSparkSkippingIndex): DataFrame = { flint.spark.read .format(FLINT_DATASOURCE) .load(index.name()) - .filter(new Column(rewrittenPredicate)) - .select(FILE_PATH_COLUMN) } } diff --git a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala index 646363e23e..113ad8bf82 100644 --- a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala +++ b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala @@ -6,10 +6,12 @@ package org.opensearch.flint.spark.skipping import org.apache.hadoop.fs.{FileStatus, Path} +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.FILE_PATH_COLUMN -import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.{Column, DataFrame} +import org.apache.spark.sql.catalyst.expressions.{Expression, Predicate} import org.apache.spark.sql.execution.datasources.{FileIndex, PartitionDirectory} +import org.apache.spark.sql.flint.config.FlintSparkConf import org.apache.spark.sql.types.StructType /** @@ -17,25 +19,27 @@ import org.apache.spark.sql.types.StructType * * @param baseFileIndex * original file index - * @param queryIndex + * @param indexScan * query skipping index DF with pushed down filters */ -case class FlintSparkSkippingFileIndex(baseFileIndex: FileIndex, queryIndex: DataFrame) +case class FlintSparkSkippingFileIndex( + baseFileIndex: FileIndex, + indexScan: DataFrame, + indexFilter: Predicate) extends FileIndex { override def listFiles( partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { - val selectedFiles = - queryIndex.collect - .map(_.getString(0)) - .toSet - + // TODO: try to avoid the list call if no hybrid scan val partitions = baseFileIndex.listFiles(partitionFilters, dataFilters) - partitions - .map(p => p.copy(files = p.files.filter(f => isFileNotSkipped(selectedFiles, f)))) - .filter(p => p.files.nonEmpty) + + if (FlintSparkConf().isHybridScanEnabled) { + scanFilesFromIndexAndSource(partitions) + } else { + scanFilesFromIndex(partitions) + } } override def rootPaths: Seq[Path] = baseFileIndex.rootPaths @@ -48,6 +52,25 @@ case class FlintSparkSkippingFileIndex(baseFileIndex: FileIndex, queryIndex: Dat override def partitionSchema: StructType = baseFileIndex.partitionSchema + private def scanFilesFromIndexAndSource( + partitions: Seq[PartitionDirectory]): Seq[PartitionDirectory] = { + Seq.empty + } + + private def scanFilesFromIndex(partitions: Seq[PartitionDirectory]): Seq[PartitionDirectory] = { + val selectedFiles = + indexScan + .filter(new Column(indexFilter)) + .select(FILE_PATH_COLUMN) + .collect + .map(_.getString(0)) + .toSet + + partitions + .map(p => p.copy(files = p.files.filter(f => isFileNotSkipped(selectedFiles, f)))) + .filter(p => p.files.nonEmpty) + } + private def isFileNotSkipped(selectedFiles: Set[String], f: FileStatus) = { selectedFiles.contains(f.getPath.toUri.toString) } diff --git a/flint/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndexSuite.scala b/flint/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndexSuite.scala index 57f755e449..8e7b1c2a32 100644 --- a/flint/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndexSuite.scala +++ b/flint/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndexSuite.scala @@ -13,37 +13,63 @@ import org.scalatest.matchers.should.Matchers import org.scalatestplus.mockito.MockitoSugar.mock import org.apache.spark.FlintSuite -import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.{Column, DataFrame, Row} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.catalyst.expressions.{Literal, Predicate} import org.apache.spark.sql.execution.datasources.{FileIndex, PartitionDirectory} +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.types._ class FlintSparkSkippingFileIndexSuite extends FlintSuite with Matchers { - test("") { - val baseFileIndex = mock[FileIndex] - when(baseFileIndex.listFiles(any(), any())) - .thenReturn(mockPartitions(Map("partition-1" -> Seq("filepath-1")))) - - val queryIndex = mockQueryIndexDf(Seq("filepath-1")) - - val fileIndex = FlintSparkSkippingFileIndex(baseFileIndex, queryIndex) - fileIndex.listFiles(Seq.empty, Seq.empty) shouldBe - mockPartitions(Map("partition-1" -> Seq("filepath-1"))) + test("should skip unknown source files in non-hybrid-scan mode") { + assertFlintFileIndex() + .withSourceFiles(Map("partition-1" -> Seq("file-1", "file-2"))) + .withIndexData( + Map((FILE_PATH_COLUMN, StringType), ("year", IntegerType)), + Seq(Row("file-1", 2023), Row("file-2", 2022))) + .withIndexFilter(col("year") === 2023) + .shouldScanSourceFiles(Map("partition-1" -> Seq("file-1"))) } - private def mockPartitions(partitions: Map[String, Seq[String]]): Seq[PartitionDirectory] = { - partitions.map { case (partitionName, filePaths) => - val files = filePaths.map(path => new FileStatus(0, false, 0, 0, 0, new Path(path))) - PartitionDirectory(InternalRow(Literal(partitionName)), files) - }.toSeq + private def assertFlintFileIndex(): AssertionHelper = { + new AssertionHelper } - private def mockQueryIndexDf(filePaths: Seq[String]): DataFrame = { - val mySpark = spark - import mySpark.implicits._ + private class AssertionHelper { + private val baseFileIndex = mock[FileIndex] + private var indexScan: DataFrame = _ + private var indexFilter: Predicate = _ + + def withSourceFiles(partitions: Map[String, Seq[String]]): AssertionHelper = { + when(baseFileIndex.listFiles(any(), any())) + .thenReturn(mockPartitions(partitions)) + this + } + + def withIndexData(columns: Map[String, DataType], data: Seq[Row]): AssertionHelper = { + val schema = StructType(columns.map { case (colName, colType) => + StructField(colName, colType, nullable = false) + }.toSeq) + indexScan = spark.createDataFrame(spark.sparkContext.parallelize(data), schema) + this + } + + def withIndexFilter(pred: Column): AssertionHelper = { + indexFilter = pred.expr.asInstanceOf[Predicate] + this + } + + def shouldScanSourceFiles(partitions: Map[String, Seq[String]]): Unit = { + val fileIndex = FlintSparkSkippingFileIndex(baseFileIndex, indexScan, indexFilter) + fileIndex.listFiles(Seq.empty, Seq.empty) shouldBe mockPartitions(partitions) + } - val columns = Seq(FILE_PATH_COLUMN) - filePaths.toDF(columns: _*) + private def mockPartitions(partitions: Map[String, Seq[String]]): Seq[PartitionDirectory] = { + partitions.map { case (partitionName, filePaths) => + val files = filePaths.map(path => new FileStatus(0, false, 0, 0, 0, new Path(path))) + PartitionDirectory(InternalRow(Literal(partitionName)), files) + }.toSeq + } } } diff --git a/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSuite.scala b/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSuite.scala index 2128196376..9f979805a1 100644 --- a/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSuite.scala +++ b/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSuite.scala @@ -333,7 +333,7 @@ class FlintSparkSkippingIndexSuite // Custom matcher to check if FlintSparkSkippingFileIndex has expected filter condition def hasIndexFilter(expect: Column): Matcher[FlintSparkSkippingFileIndex] = { Matcher { (fileIndex: FlintSparkSkippingFileIndex) => - val plan = fileIndex.queryIndex.queryExecution.logical + val plan = fileIndex.indexScan.queryExecution.logical val hasExpectedFilter = plan.find { case Filter(actual, _) => actual.semanticEquals(expect.expr) From 136de9f05731a56f4ecff9eae8727a50fdb50b03 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Fri, 7 Jul 2023 13:09:25 -0700 Subject: [PATCH 3/4] Implement select file logic for hybrid scan mode Signed-off-by: Chen Dai --- .../FlintSparkSkippingFileIndex.scala | 67 ++++++++++++------ .../FlintSparkSkippingFileIndexSuite.scala | 69 +++++++++++++++++-- 2 files changed, 111 insertions(+), 25 deletions(-) diff --git a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala index 113ad8bf82..0820e505ed 100644 --- a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala +++ b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala @@ -12,6 +12,7 @@ import org.apache.spark.sql.{Column, DataFrame} import org.apache.spark.sql.catalyst.expressions.{Expression, Predicate} import org.apache.spark.sql.execution.datasources.{FileIndex, PartitionDirectory} import org.apache.spark.sql.flint.config.FlintSparkConf +import org.apache.spark.sql.functions.isnull import org.apache.spark.sql.types.StructType /** @@ -32,14 +33,19 @@ case class FlintSparkSkippingFileIndex( partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { - // TODO: try to avoid the list call if no hybrid scan + // TODO: make this listFile call only in hybrid scan mode val partitions = baseFileIndex.listFiles(partitionFilters, dataFilters) + val selectedFiles = + if (FlintSparkConf().isHybridScanEnabled) { + selectFilesFromIndexAndSource(partitions) + } else { + selectFilesFromIndexOnly() + } - if (FlintSparkConf().isHybridScanEnabled) { - scanFilesFromIndexAndSource(partitions) - } else { - scanFilesFromIndex(partitions) - } + // Keep partition files present in selected file list above + partitions + .map(p => p.copy(files = p.files.filter(f => isFileNotSkipped(selectedFiles, f)))) + .filter(p => p.files.nonEmpty) } override def rootPaths: Seq[Path] = baseFileIndex.rootPaths @@ -52,23 +58,42 @@ case class FlintSparkSkippingFileIndex( override def partitionSchema: StructType = baseFileIndex.partitionSchema - private def scanFilesFromIndexAndSource( - partitions: Seq[PartitionDirectory]): Seq[PartitionDirectory] = { - Seq.empty - } - - private def scanFilesFromIndex(partitions: Seq[PartitionDirectory]): Seq[PartitionDirectory] = { - val selectedFiles = - indexScan - .filter(new Column(indexFilter)) - .select(FILE_PATH_COLUMN) - .collect - .map(_.getString(0)) - .toSet + /* + * Left join source partitions and index data to keep unrefreshed source files: + * Express the logic in SQL: + * SELECT left.file_path + * FROM partitions AS left + * LEFT OUTER JOIN indexScan AS right + * ON left.file_path = right.file_path + * WHERE right.file_path IS NULL + * OR [indexFilter] + */ + private def selectFilesFromIndexAndSource(partitions: Seq[PartitionDirectory]): Set[String] = { + val sparkSession = indexScan.sparkSession + import sparkSession.implicits._ partitions - .map(p => p.copy(files = p.files.filter(f => isFileNotSkipped(selectedFiles, f)))) - .filter(p => p.files.nonEmpty) + .flatMap(_.files.map(f => f.getPath.toString)) + .toDF(FILE_PATH_COLUMN) + .join(indexScan, Seq(FILE_PATH_COLUMN), "left") + .filter(isnull(indexScan(FILE_PATH_COLUMN)) || new Column(indexFilter)) + .select(FILE_PATH_COLUMN) + .collect() + .map(_.getString(0)) + .toSet + } + + /* + * Consider file paths in index data alone. In this case, index filter can be pushed down + * to index store. + */ + private def selectFilesFromIndexOnly(): Set[String] = { + indexScan + .filter(new Column(indexFilter)) + .select(FILE_PATH_COLUMN) + .collect + .map(_.getString(0)) + .toSet } private def isFileNotSkipped(selectedFiles: Set[String], f: FileStatus) = { diff --git a/flint/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndexSuite.scala b/flint/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndexSuite.scala index 8e7b1c2a32..df661cdbcf 100644 --- a/flint/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndexSuite.scala +++ b/flint/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndexSuite.scala @@ -17,21 +17,82 @@ import org.apache.spark.sql.{Column, DataFrame, Row} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Literal, Predicate} import org.apache.spark.sql.execution.datasources.{FileIndex, PartitionDirectory} +import org.apache.spark.sql.flint.config.FlintSparkConf.HYBRID_SCAN_ENABLED import org.apache.spark.sql.functions.col import org.apache.spark.sql.types._ class FlintSparkSkippingFileIndexSuite extends FlintSuite with Matchers { - test("should skip unknown source files in non-hybrid-scan mode") { + /** Test source partition data. */ + private val partition1 = "partition-1" -> Seq("file-1", "file-2") + private val partition2 = "partition-2" -> Seq("file-3") + + /** Test index data schema. */ + private val schema = Map((FILE_PATH_COLUMN, StringType), ("year", IntegerType)) + + test("should keep files returned from index") { + assertFlintFileIndex() + .withSourceFiles(Map(partition1)) + .withIndexData(schema, Seq(Row("file-1", 2023), Row("file-2", 2022))) + .withIndexFilter(col("year") === 2023) + .shouldScanSourceFiles(Map("partition-1" -> Seq("file-1"))) + } + + test("should keep files of multiple partitions returned from index") { + assertFlintFileIndex() + .withSourceFiles(Map(partition1, partition2)) + .withIndexData(schema, Seq(Row("file-1", 2023), Row("file-2", 2022), Row("file-3", 2023))) + .withIndexFilter(col("year") === 2023) + .shouldScanSourceFiles(Map("partition-1" -> Seq("file-1"), "partition-2" -> Seq("file-3"))) + } + + test("should skip unrefreshed source files by default") { assertFlintFileIndex() - .withSourceFiles(Map("partition-1" -> Seq("file-1", "file-2"))) + .withSourceFiles(Map(partition1)) .withIndexData( - Map((FILE_PATH_COLUMN, StringType), ("year", IntegerType)), - Seq(Row("file-1", 2023), Row("file-2", 2022))) + schema, + Seq(Row("file-1", 2023)) // file-2 is not refreshed to index yet + ) .withIndexFilter(col("year") === 2023) .shouldScanSourceFiles(Map("partition-1" -> Seq("file-1"))) } + test("should not skip unrefreshed source files in hybrid-scan mode") { + withHybridScanEnabled { + assertFlintFileIndex() + .withSourceFiles(Map(partition1)) + .withIndexData( + schema, + Seq(Row("file-1", 2023)) // file-2 is not refreshed to index yet + ) + .withIndexFilter(col("year") === 2023) + .shouldScanSourceFiles(Map("partition-1" -> Seq("file-1", "file-2"))) + } + } + + test("should not skip unrefreshed source files of multiple partitions in hybrid-scan mode") { + withHybridScanEnabled { + assertFlintFileIndex() + .withSourceFiles(Map(partition1, partition2)) + .withIndexData( + schema, + Seq(Row("file-1", 2023)) // file-2 is not refreshed to index yet + ) + .withIndexFilter(col("year") === 2023) + .shouldScanSourceFiles( + Map("partition-1" -> Seq("file-1", "file-2"), "partition-2" -> Seq("file-3"))) + } + } + + private def withHybridScanEnabled(block: => Unit): Unit = { + setFlintSparkConf(HYBRID_SCAN_ENABLED, "true") + try { + block + } finally { + setFlintSparkConf(HYBRID_SCAN_ENABLED, "false") + } + } + private def assertFlintFileIndex(): AssertionHelper = { new AssertionHelper } From a710266a0d77d7c1c122d369846e36bed8f99da1 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Fri, 7 Jul 2023 14:05:32 -0700 Subject: [PATCH 4/4] Add IT Signed-off-by: Chen Dai --- flint/docs/index.md | 1 + .../FlintSparkSkippingFileIndex.scala | 8 +++--- .../scala/org/apache/spark/FlintSuite.scala | 10 +++++++ .../FlintSparkSkippingFileIndexSuite.scala | 16 +++--------- .../spark/FlintSparkSkippingIndexSuite.scala | 26 +++++++++++++++++++ 5 files changed, 44 insertions(+), 17 deletions(-) diff --git a/flint/docs/index.md b/flint/docs/index.md index 81761d63d4..0a6453d999 100644 --- a/flint/docs/index.md +++ b/flint/docs/index.md @@ -219,6 +219,7 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i IMMEDIATE(true), WAIT_UNTIL(wait_for)] - `spark.datasource.flint.read.scroll_size`: default value is 100. - `spark.flint.optimizer.enabled`: default is true. +- `spark.flint.index.hybridscan.enabled`: default is false. #### Data Type Mapping diff --git a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala index 0820e505ed..ac954e7bb3 100644 --- a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala +++ b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala @@ -59,12 +59,12 @@ case class FlintSparkSkippingFileIndex( override def partitionSchema: StructType = baseFileIndex.partitionSchema /* - * Left join source partitions and index data to keep unrefreshed source files: + * Left join source partitions and index data to keep unknown source files: * Express the logic in SQL: * SELECT left.file_path * FROM partitions AS left - * LEFT OUTER JOIN indexScan AS right - * ON left.file_path = right.file_path + * LEFT JOIN indexScan AS right + * ON left.file_path = right.file_path * WHERE right.file_path IS NULL * OR [indexFilter] */ @@ -73,7 +73,7 @@ case class FlintSparkSkippingFileIndex( import sparkSession.implicits._ partitions - .flatMap(_.files.map(f => f.getPath.toString)) + .flatMap(_.files.map(f => f.getPath.toUri.toString)) .toDF(FILE_PATH_COLUMN) .join(indexScan, Seq(FILE_PATH_COLUMN), "left") .filter(isnull(indexScan(FILE_PATH_COLUMN)) || new Column(indexFilter)) diff --git a/flint/flint-spark-integration/src/test/scala/org/apache/spark/FlintSuite.scala b/flint/flint-spark-integration/src/test/scala/org/apache/spark/FlintSuite.scala index 451c7e5dd6..ee8a52d968 100644 --- a/flint/flint-spark-integration/src/test/scala/org/apache/spark/FlintSuite.scala +++ b/flint/flint-spark-integration/src/test/scala/org/apache/spark/FlintSuite.scala @@ -10,6 +10,7 @@ import org.opensearch.flint.spark.FlintSparkExtensions import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation import org.apache.spark.sql.flint.config.FlintConfigEntry +import org.apache.spark.sql.flint.config.FlintSparkConf.HYBRID_SCAN_ENABLED import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -34,4 +35,13 @@ trait FlintSuite extends SharedSparkSession { protected def setFlintSparkConf[T](config: FlintConfigEntry[T], value: Any): Unit = { spark.conf.set(config.key, value.toString) } + + protected def withHybridScanEnabled(block: => Unit): Unit = { + setFlintSparkConf(HYBRID_SCAN_ENABLED, "true") + try { + block + } finally { + setFlintSparkConf(HYBRID_SCAN_ENABLED, "false") + } + } } diff --git a/flint/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndexSuite.scala b/flint/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndexSuite.scala index df661cdbcf..d2ef72158d 100644 --- a/flint/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndexSuite.scala +++ b/flint/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndexSuite.scala @@ -17,7 +17,6 @@ import org.apache.spark.sql.{Column, DataFrame, Row} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Literal, Predicate} import org.apache.spark.sql.execution.datasources.{FileIndex, PartitionDirectory} -import org.apache.spark.sql.flint.config.FlintSparkConf.HYBRID_SCAN_ENABLED import org.apache.spark.sql.functions.col import org.apache.spark.sql.types._ @@ -46,7 +45,7 @@ class FlintSparkSkippingFileIndexSuite extends FlintSuite with Matchers { .shouldScanSourceFiles(Map("partition-1" -> Seq("file-1"), "partition-2" -> Seq("file-3"))) } - test("should skip unrefreshed source files by default") { + test("should skip unknown source files by default") { assertFlintFileIndex() .withSourceFiles(Map(partition1)) .withIndexData( @@ -57,7 +56,7 @@ class FlintSparkSkippingFileIndexSuite extends FlintSuite with Matchers { .shouldScanSourceFiles(Map("partition-1" -> Seq("file-1"))) } - test("should not skip unrefreshed source files in hybrid-scan mode") { + test("should not skip unknown source files in hybrid-scan mode") { withHybridScanEnabled { assertFlintFileIndex() .withSourceFiles(Map(partition1)) @@ -70,7 +69,7 @@ class FlintSparkSkippingFileIndexSuite extends FlintSuite with Matchers { } } - test("should not skip unrefreshed source files of multiple partitions in hybrid-scan mode") { + test("should not skip unknown source files of multiple partitions in hybrid-scan mode") { withHybridScanEnabled { assertFlintFileIndex() .withSourceFiles(Map(partition1, partition2)) @@ -84,15 +83,6 @@ class FlintSparkSkippingFileIndexSuite extends FlintSuite with Matchers { } } - private def withHybridScanEnabled(block: => Unit): Unit = { - setFlintSparkConf(HYBRID_SCAN_ENABLED, "true") - try { - block - } finally { - setFlintSparkConf(HYBRID_SCAN_ENABLED, "false") - } - } - private def assertFlintFileIndex(): AssertionHelper = { new AssertionHelper } diff --git a/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSuite.scala b/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSuite.scala index 9f979805a1..cc63b55423 100644 --- a/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSuite.scala +++ b/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSuite.scala @@ -301,6 +301,32 @@ class FlintSparkSkippingIndexSuite hasIndexFilter(col("MinMax_age_0") <= 25 && col("MinMax_age_1") >= 25)) } + test("should rewrite applicable query to scan latest source files in hybrid scan mode") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("month") + .create() + flint.refreshIndex(testIndex, FULL) + + // Generate a new source file which is not in index data + sql(s""" + | INSERT INTO $testTable + | PARTITION (year=2023, month=4) + | VALUES ('Hello', 35, 'Vancouver') + | """.stripMargin) + + withHybridScanEnabled { + val query = sql(s""" + | SELECT address + | FROM $testTable + | WHERE month = 4 + |""".stripMargin) + + checkAnswer(query, Seq(Row("Seattle"), Row("Vancouver"))) + } + } + test("should return empty if describe index not exist") { flint.describeIndex("non-exist") shouldBe empty }