From 959765ac2da2a346a28bde633c97931e9183811a Mon Sep 17 00:00:00 2001 From: jintao shen Date: Wed, 30 Oct 2024 10:04:53 -0700 Subject: [PATCH] [Spark] Support OPTIMIZE tbl FULL for clustered table (#3793) #### Which Delta project/connector is this regarding? - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description 1. Add new sql syntax OPTIMIZE tbl FULL 2. Implemented OPTIMIZE tbl FULL to re-cluster all data in the table. ## How was this patch tested? new unit tests added ## Does this PR introduce _any_ user-facing changes? Yes Previously clustered table won't re-cluster data that was clustered against different cluster keys. With OPTIMIZE tbl FULL, they will be re-clustered against the new keys. --- .../io/delta/sql/parser/DeltaSqlBase.g4 | 5 +- .../resources/error/delta-error-classes.json | 6 + .../io/delta/sql/parser/DeltaSqlParser.scala | 3 +- .../apache/spark/sql/delta/DeltaErrors.scala | 6 + .../delta/commands/OptimizeTableCommand.scala | 16 +- .../commands/OptimizeTableStrategy.scala | 56 ++-- .../optimize/ZCubeFileStatsCollector.scala | 9 +- .../commands/optimize/ZOrderMetrics.scala | 4 +- .../skipping/clustering/ClusteringStats.scala | 7 +- .../sql/parser/DeltaSqlParserSuite.scala | 38 ++- .../skipping/ClusteredTableTestUtils.scala | 13 + .../IncrementalZCubeClusteringSuite.scala | 272 +++++++++++++++++- 12 files changed, 392 insertions(+), 43 deletions(-) diff --git a/spark/src/main/antlr4/io/delta/sql/parser/DeltaSqlBase.g4 b/spark/src/main/antlr4/io/delta/sql/parser/DeltaSqlBase.g4 index f55b861164..3cdf60158d 100644 --- a/spark/src/main/antlr4/io/delta/sql/parser/DeltaSqlBase.g4 +++ b/spark/src/main/antlr4/io/delta/sql/parser/DeltaSqlBase.g4 @@ -93,7 +93,7 @@ statement (clusterBySpec | CLUSTER BY NONE) #alterTableClusterBy | ALTER TABLE table=qualifiedName (ALTER | CHANGE) COLUMN? column=qualifiedName SYNC IDENTITY #alterTableSyncIdentity - | OPTIMIZE (path=STRING | table=qualifiedName) + | OPTIMIZE (path=STRING | table=qualifiedName) FULL? (WHERE partitionPredicate=predicateToken)? (zorderSpec)? #optimizeTable | REORG TABLE table=qualifiedName @@ -237,7 +237,7 @@ nonReserved : VACUUM | USING | INVENTORY | RETAIN | HOURS | DRY | RUN | CONVERT | TO | DELTA | PARTITIONED | BY | DESC | DESCRIBE | LIMIT | DETAIL - | GENERATE | FOR | TABLE | CHECK | EXISTS | OPTIMIZE + | GENERATE | FOR | TABLE | CHECK | EXISTS | OPTIMIZE | FULL | IDENTITY | SYNC | COLUMN | CHANGE | REORG | APPLY | PURGE | UPGRADE | UNIFORM | ICEBERG_COMPAT_VERSION | RESTORE | AS | OF @@ -275,6 +275,7 @@ EXISTS: 'EXISTS'; FALSE: 'FALSE'; FEATURE: 'FEATURE'; FOR: 'FOR'; +FULL: 'FULL'; GENERATE: 'GENERATE'; HISTORY: 'HISTORY'; HOURS: 'HOURS'; diff --git a/spark/src/main/resources/error/delta-error-classes.json b/spark/src/main/resources/error/delta-error-classes.json index 4a0e54be6e..016ed77247 100644 --- a/spark/src/main/resources/error/delta-error-classes.json +++ b/spark/src/main/resources/error/delta-error-classes.json @@ -1963,6 +1963,12 @@ ], "sqlState" : "0AKDC" }, + "DELTA_OPTIMIZE_FULL_NOT_SUPPORTED" : { + "message" : [ + "OPTIMIZE FULL is only supported for clustered tables with non-empty clustering columns." + ], + "sqlState" : "42601" + }, "DELTA_OVERWRITE_SCHEMA_WITH_DYNAMIC_PARTITION_OVERWRITE" : { "message" : [ "'overwriteSchema' cannot be used in dynamic partition overwrite mode." diff --git a/spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala b/spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala index 9788879a24..c8da5f89f9 100644 --- a/spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala +++ b/spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala @@ -368,7 +368,8 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] { OptimizeTableCommand( Option(ctx.path).map(string), Option(ctx.table).map(visitTableIdentifier), - Option(ctx.partitionPredicate).map(extractRawText(_)).toSeq)(interleaveBy) + Option(ctx.partitionPredicate).map(extractRawText(_)).toSeq, + DeltaOptimizeContext(isFull = ctx.FULL != null))(interleaveBy) } /** diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala index 82fa0028b5..f1d75b6dff 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala @@ -3456,6 +3456,12 @@ trait DeltaErrorsBase messageParameters = Array(s"${zOrderBy.map(_.name).mkString(", ")}")) } + def optimizeFullNotSupportedException(): Throwable = { + new DeltaUnsupportedOperationException( + errorClass = "DELTA_OPTIMIZE_FULL_NOT_SUPPORTED", + messageParameters = Array.empty) + } + def alterClusterByNotOnDeltaTableException(): Throwable = { new DeltaAnalysisException( errorClass = "DELTA_ONLY_OPERATION", diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala index 1d01ec97d9..1fa63e9595 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala @@ -128,8 +128,10 @@ object OptimizeTableCommand { /** * The `optimize` command implementation for Spark SQL. Example SQL: * {{{ - * OPTIMIZE ('/path/to/dir' | delta.table) [WHERE part = 25]; + * OPTIMIZE ('/path/to/dir' | delta.table) [WHERE part = 25] [FULL]; * }}} + * + * Note FULL and WHERE clauses are set exclusively. */ case class OptimizeTableCommand( override val child: LogicalPlan, @@ -151,7 +153,8 @@ case class OptimizeTableCommand( throw DeltaErrors.notADeltaTableException(table.deltaLog.dataPath.toString) } - if (ClusteredTableUtils.isSupported(snapshot.protocol)) { + val isClusteredTable = ClusteredTableUtils.isSupported(snapshot.protocol) + if (isClusteredTable) { if (userPartitionPredicates.nonEmpty) { throw DeltaErrors.clusteringWithPartitionPredicatesException(userPartitionPredicates) } @@ -160,6 +163,11 @@ case class OptimizeTableCommand( } } + lazy val clusteringColumns = ClusteringColumnInfo.extractLogicalNames(snapshot) + if (optimizeContext.isFull && (!isClusteredTable || clusteringColumns.isEmpty)) { + throw DeltaErrors.optimizeFullNotSupportedException() + } + val partitionColumns = snapshot.metadata.partitionColumns // Parse the predicate expression into Catalyst expression and verify only simple filters // on partition columns are present @@ -199,12 +207,14 @@ case class OptimizeTableCommand( * this threshold will be rewritten by the OPTIMIZE command. If not * specified, [[DeltaSQLConf.DELTA_OPTIMIZE_MAX_DELETED_ROWS_RATIO]] * will be used. This parameter must be set to `0` when [[reorg]] is set. + * @param isFull whether OPTIMIZE FULL is run. This is only for clustered tables. */ case class DeltaOptimizeContext( reorg: Option[DeltaReorgOperation] = None, minFileSize: Option[Long] = None, maxFileSize: Option[Long] = None, - maxDeletedRowsRatio: Option[Double] = None) { + maxDeletedRowsRatio: Option[Double] = None, + isFull: Boolean = false) { if (reorg.nonEmpty) { require( minFileSize.contains(0L) && maxDeletedRowsRatio.contains(0d), diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableStrategy.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableStrategy.scala index 884099a5b5..3dbd1558bb 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableStrategy.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableStrategy.scala @@ -109,7 +109,7 @@ object OptimizeTableStrategy { zOrderBy: Seq[String]): OptimizeTableStrategy = getMode(snapshot, zOrderBy) match { case OptimizeTableMode.CLUSTERING => ClusteringStrategy( - sparkSession, ClusteringColumnInfo.extractLogicalNames(snapshot)) + sparkSession, ClusteringColumnInfo.extractLogicalNames(snapshot), optimizeContext) case OptimizeTableMode.ZORDER => ZOrderStrategy(sparkSession, zOrderBy) case OptimizeTableMode.COMPACTION => CompactionStrategy(sparkSession, optimizeContext) @@ -188,7 +188,8 @@ case class ZOrderStrategy( /** Implements clustering strategy for clustered tables */ case class ClusteringStrategy( override val sparkSession: SparkSession, - clusteringColumns: Seq[String]) extends OptimizeTableStrategy { + clusteringColumns: Seq[String], + optimizeContext: DeltaOptimizeContext) extends OptimizeTableStrategy { override val optimizeTableMode: OptimizeTableMode.Value = OptimizeTableMode.CLUSTERING @@ -237,9 +238,10 @@ case class ClusteringStrategy( * clustering. The requirements to pick candidate files are: * * 1. Candidate files are either un-clustered (missing clusteringProvider) or the - * clusteringProvider is "liquid". - * 2. Clustered files (clusteringProvider is set) with different clustering columns are skipped. - * When clustering columns are changed, existing clustered data is not re-clustered. + * clusteringProvider is "liquid" when isFull is unset. + * 2. Clustered files with different clustering columns are handled differently based + * on isFull setting: If isFull is unset, existing clustered files with different columns are + * skipped. If isFull is set, all files are considered. * 3. Files that belong to the partial ZCubes are picked. A ZCube is considered as a partial * ZCube if its size is smaller than [[DELTA_OPTIMIZE_CLUSTERING_MIN_CUBE_SIZE]]. * 4. If there is only single ZCUBE with all files are clustered and if all clustered files @@ -247,10 +249,16 @@ case class ClusteringStrategy( */ private def applyMinZCube(files: Seq[AddFile]): Seq[AddFile] = { val targetSize = sparkSession.sessionState.conf.getConf(DELTA_OPTIMIZE_CLUSTERING_MIN_CUBE_SIZE) - // Skip files with from different clusteringProviders or files clustered by a different set - // of clustering columns. - val inputFiles = files.iterator.filter { file => - clusteringStatsCollector.inputStats.updateStats(file) + // Keep all files if isFull is set, otherwise skip files with different clusteringProviders + // or files clustered by a different set of clustering columns. + val (candidateFiles, skippedClusteredFiles) = files.iterator.map { f => + // Note that updateStats is moved out of Iterator.partition lambda since + // scala2.13 doesn't call the lambda in the order of files which violates + // the updateStats' requirement which requires files are ordered in the + // ZCUBE id (files have been ordered before calling applyMinZCube). + clusteringStatsCollector.inputStats.updateStats(f) + f + }.partition { file => val sameOrMissingClusteringProvider = file.clusteringProvider.forall(_ == ClusteredTableUtils.clusteringProvider) @@ -258,18 +266,30 @@ case class ClusteringStrategy( val zCubeInfo = ZCubeInfo.getForFile(file) val unmatchedClusteringColumns = zCubeInfo.exists(_.zOrderBy != clusteringColumns) sameOrMissingClusteringProvider && !unmatchedClusteringColumns - }.map(AddFileWithNumRecords.createFromFile) + } // Skip files that belong to a ZCUBE that is larger than target ZCUBE size. - val smallZCubeFiles = ZCube.filterOutLargeZCubes(inputFiles, targetSize) - - // Skip smallZCubeFiles if they all belong to a single ZCUBE. - ZCube.filterOutSingleZCubes(smallZCubeFiles).map { file => - clusteringStatsCollector.outputStats.updateStats(file.addFile) - file.addFile - }.toSeq + // Note that ZCube.filterOutLargeZCubes requires clustered files have + // the same clustering columns, so skippedClusteredFiles are not included. + val smallZCubeFiles = ZCube.filterOutLargeZCubes( + candidateFiles.map(AddFileWithNumRecords.createFromFile), targetSize) + + if (optimizeContext.isFull && skippedClusteredFiles.nonEmpty) { + // Clustered files with different clustering columns have to be re-clustered. + val finalFiles = (smallZCubeFiles.map(_.addFile) ++ skippedClusteredFiles).toSeq + finalFiles.map { f => + clusteringStatsCollector.outputStats.updateStats(f) + f + } + } else { + // Skip smallZCubeFiles if they all belong to a single ZCUBE. + ZCube.filterOutSingleZCubes(smallZCubeFiles).map { file => + clusteringStatsCollector.outputStats.updateStats(file.addFile) + file.addFile + }.toSeq + } } /** Metrics for clustering when [[isClusteredTable]] is true. */ private val clusteringStatsCollector: ClusteringStatsCollector = - ClusteringStatsCollector(clusteringColumns) + ClusteringStatsCollector(clusteringColumns, optimizeContext) } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/optimize/ZCubeFileStatsCollector.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/optimize/ZCubeFileStatsCollector.scala index 108a25af8c..660c42ea4a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/optimize/ZCubeFileStatsCollector.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/optimize/ZCubeFileStatsCollector.scala @@ -26,8 +26,11 @@ import org.apache.spark.sql.delta.zorder.ZCubeInfo.ZCubeID * calling updateStats on every new file seen. * The number of ZCubes, number of files from matching cubes and number of unoptimized files are * captured here. + * + * @param zOrderBy zOrder or clustering columns. + * @param isFull whether OPTIMIZE FULL is run. This is only for clustered tables. */ -class ZCubeFileStatsCollector(zOrderBy: Seq[String]) { +class ZCubeFileStatsCollector(zOrderBy: Seq[String], isFull: Boolean) { /** map that holds the file statistics Map("element" -> (number of files, total file size)) */ private var processedZCube: ZCubeID = _ @@ -47,7 +50,9 @@ class ZCubeFileStatsCollector(zOrderBy: Seq[String]) { /** method to update the zCubeFileStats incrementally by file */ def updateStats(file: AddFile): AddFile = { val zCubeInfo = ZCubeInfo.getForFile(file) - if (zCubeInfo.isDefined && zCubeInfo.get.zOrderBy == zOrderBy) { + // Note that clustered files with different clustering columns are considered candidate + // files when isFull is set. + if (zCubeInfo.isDefined && (isFull || zCubeInfo.get.zOrderBy == zOrderBy)) { if (processedZCube != zCubeInfo.get.zCubeID) { processedZCube = zCubeInfo.get.zCubeID numZCubes += 1 diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/optimize/ZOrderMetrics.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/optimize/ZOrderMetrics.scala index 799c7ef7b9..81351026dc 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/optimize/ZOrderMetrics.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/optimize/ZOrderMetrics.scala @@ -22,8 +22,8 @@ package org.apache.spark.sql.delta.commands.optimize class ZOrderMetrics(zOrderBy: Seq[String]) { var strategyName: String = _ - val inputStats = new ZCubeFileStatsCollector(zOrderBy) - val outputStats = new ZCubeFileStatsCollector(zOrderBy) + val inputStats = new ZCubeFileStatsCollector(zOrderBy, isFull = false) + val outputStats = new ZCubeFileStatsCollector(zOrderBy, isFull = false) var numOutputCubes = 0 def getZOrderStats(): ZOrderStats = { diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/skipping/clustering/ClusteringStats.scala b/spark/src/main/scala/org/apache/spark/sql/delta/skipping/clustering/ClusteringStats.scala index 0da2569c82..0bc1e20e50 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/skipping/clustering/ClusteringStats.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/skipping/clustering/ClusteringStats.scala @@ -16,6 +16,7 @@ package org.apache.spark.sql.delta.skipping.clustering +import org.apache.spark.sql.delta.commands.DeltaOptimizeContext import org.apache.spark.sql.delta.commands.optimize.ZCubeFileStatsCollector /** @@ -54,10 +55,10 @@ case class ClusteringStats( /** * A class help collecting ClusteringStats. */ -case class ClusteringStatsCollector(zOrderBy: Seq[String]) { +case class ClusteringStatsCollector(zOrderBy: Seq[String], optimizeContext: DeltaOptimizeContext) { - val inputStats = new ZCubeFileStatsCollector(zOrderBy) - val outputStats = new ZCubeFileStatsCollector(zOrderBy) + val inputStats = new ZCubeFileStatsCollector(zOrderBy, optimizeContext.isFull) + val outputStats = new ZCubeFileStatsCollector(zOrderBy, optimizeContext.isFull) var numOutputZCubes = 0 def getClusteringStats: ClusteringStats = { diff --git a/spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala b/spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala index 4934a1d884..1db2bbd911 100644 --- a/spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala +++ b/spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.delta.skipping.clustering.temp.ClusterByTransform import org.apache.spark.sql.delta.CloneTableSQLTestUtils import org.apache.spark.sql.delta.DeltaTestUtils.BOOLEAN_DOMAIN import org.apache.spark.sql.delta.{UnresolvedPathBasedDeltaTable, UnresolvedPathBasedTable} -import org.apache.spark.sql.delta.commands.{DescribeDeltaDetailCommand, DescribeDeltaHistory, OptimizeTableCommand, DeltaReorgTable} +import org.apache.spark.sql.delta.commands.{DeltaOptimizeContext, DescribeDeltaDetailCommand, DescribeDeltaHistory, OptimizeTableCommand, DeltaReorgTable} import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.{TableIdentifier, TimeTravel} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedTable} @@ -148,6 +148,31 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper { assert(parser.parsePlan("OPTIMIZE tbl WHERE part = 1 ZORDER BY (col1, col2.subcol)") === OptimizeTableCommand(None, Some(tblId("tbl")), Seq("part = 1"))( Seq(unresolvedAttr("col1"), unresolvedAttr("col2", "subcol")))) + + // Validate OPTIMIZE works correctly with FULL keyword. + parsedCmd = parser.parsePlan("OPTIMIZE tbl FULL") + assert(parsedCmd === + OptimizeTableCommand(None, Some(tblId("tbl")), Nil, DeltaOptimizeContext(isFull = true))(Nil)) + assert(parsedCmd.asInstanceOf[OptimizeTableCommand].child === + UnresolvedTable(Seq("tbl"), "OPTIMIZE")) + + parsedCmd = parser.parsePlan("OPTIMIZE catalog_foo.db.tbl FULL") + assert(parsedCmd === OptimizeTableCommand( + None, Some(tblId("tbl", "db", "catalog_foo")), Nil, DeltaOptimizeContext(isFull = true))(Nil)) + assert(parsedCmd.asInstanceOf[OptimizeTableCommand].child === + UnresolvedTable(Seq("catalog_foo", "db", "tbl"), "OPTIMIZE")) + + parsedCmd = parser.parsePlan("OPTIMIZE '/path/to/tbl' FULL") + assert(parsedCmd === OptimizeTableCommand( + Some("/path/to/tbl"), None, Nil, DeltaOptimizeContext(isFull = true))(Nil)) + assert(parsedCmd.asInstanceOf[OptimizeTableCommand].child === + UnresolvedPathBasedDeltaTable("/path/to/tbl", Map.empty, "OPTIMIZE")) + + parsedCmd = parser.parsePlan("OPTIMIZE delta.`/path/to/tbl` FULL") + assert(parsedCmd === OptimizeTableCommand( + None, Some(tblId("/path/to/tbl", "delta")), Nil, DeltaOptimizeContext(isFull = true))(Nil)) + assert(parsedCmd.asInstanceOf[OptimizeTableCommand].child === + UnresolvedTable(Seq("delta", "/path/to/tbl"), "OPTIMIZE")) } test("OPTIMIZE command new tokens are non-reserved keywords") { @@ -161,15 +186,18 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper { assert(parser.parsePlan("OPTIMIZE zorder") === OptimizeTableCommand(None, Some(tblId("zorder")), Nil)(Nil)) + assert(parser.parsePlan("OPTIMIZE full") === + OptimizeTableCommand(None, Some(tblId("full")), Nil)(Nil)) + // Use the new keywords in column name - assert(parser.parsePlan("OPTIMIZE tbl WHERE zorder = 1 and optimize = 2") === + assert(parser.parsePlan("OPTIMIZE tbl WHERE zorder = 1 and optimize = 2 and full = 3") === OptimizeTableCommand(None, Some(tblId("tbl")) - , Seq("zorder = 1 and optimize = 2"))(Nil)) + , Seq("zorder = 1 and optimize = 2 and full = 3"))(Nil)) - assert(parser.parsePlan("OPTIMIZE tbl ZORDER BY (optimize, zorder)") === + assert(parser.parsePlan("OPTIMIZE tbl ZORDER BY (optimize, zorder, full)") === OptimizeTableCommand(None, Some(tblId("tbl")), Nil)( - Seq(unresolvedAttr("optimize"), unresolvedAttr("zorder")))) + Seq(unresolvedAttr("optimize"), unresolvedAttr("zorder"), unresolvedAttr("full")))) } test("DESCRIBE DETAIL command is parsed as expected") { diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/skipping/ClusteredTableTestUtils.scala b/spark/src/test/scala/org/apache/spark/sql/delta/skipping/ClusteredTableTestUtils.scala index 11ec8b5bb0..ed465028fd 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/skipping/ClusteredTableTestUtils.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/skipping/ClusteredTableTestUtils.scala @@ -59,6 +59,19 @@ trait ClusteredTableTestUtilsBase verifyDescribeHistoryOperationParameters(table) } + /** + * Runs optimize full on the table and calls postHook on the metrics. + * + * @param table the name of table + * @param postHook callback triggered with OptimizeMetrics returned by the OPTIMIZE command + */ + def runOptimizeFull(table: String)(postHook: OptimizeMetrics => Unit): Unit = { + postHook(sql(s"OPTIMIZE $table FULL").select($"metrics.*").as[OptimizeMetrics].head()) + + // Verify Delta history operation parameters' clusterBy + verifyDescribeHistoryOperationParameters(table) + } + def verifyClusteringColumnsInDomainMetadata( snapshot: Snapshot, logicalColumnNames: Seq[String]): Unit = { diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/skipping/clustering/IncrementalZCubeClusteringSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/skipping/clustering/IncrementalZCubeClusteringSuite.scala index 36e46355a1..f6f59358b2 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/skipping/clustering/IncrementalZCubeClusteringSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/skipping/clustering/IncrementalZCubeClusteringSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.delta.skipping.clustering // scalastyle:off import.ordering.noEmptyLine import org.apache.spark.sql.delta.skipping.ClusteredTableTestUtilsBase import org.apache.spark.sql.delta.skipping.clustering.{ClusteredTableUtils, ClusteringColumnInfo, ClusteringFileStats, ClusteringStats} -import org.apache.spark.sql.delta.DeltaLog +import org.apache.spark.sql.delta.{DeltaLog, DeltaOperations, DeltaUnsupportedOperationException} import org.apache.spark.sql.delta.actions.AddFile import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.DeltaSQLCommandTest @@ -73,17 +73,17 @@ class IncrementalZCubeClusteringSuite extends QueryTest actualMetrics: ClusteringStats, expectedMetrics: ClusteringStats): Unit = { var finalActualMetrics = actualMetrics if (expectedMetrics.inputZCubeFiles.size == SKIP_CHECK_SIZE_VALUE) { - val stats = expectedMetrics.inputZCubeFiles + val stats = finalActualMetrics.inputZCubeFiles finalActualMetrics = finalActualMetrics.copy(inputZCubeFiles = stats.copy(size = SKIP_CHECK_SIZE_VALUE)) } if (expectedMetrics.inputOtherFiles.size == SKIP_CHECK_SIZE_VALUE) { - val stats = expectedMetrics.inputOtherFiles + val stats = finalActualMetrics.inputOtherFiles finalActualMetrics = finalActualMetrics.copy(inputOtherFiles = stats.copy(size = SKIP_CHECK_SIZE_VALUE)) } if (expectedMetrics.mergedFiles.size == SKIP_CHECK_SIZE_VALUE) { - val stats = expectedMetrics.mergedFiles + val stats = finalActualMetrics.mergedFiles finalActualMetrics = finalActualMetrics.copy(mergedFiles = stats.copy(size = SKIP_CHECK_SIZE_VALUE)) } @@ -153,7 +153,7 @@ class IncrementalZCubeClusteringSuite extends QueryTest inputZCubeFiles = ClusteringFileStats(2, SKIP_CHECK_SIZE_VALUE), inputOtherFiles = ClusteringFileStats(4, SKIP_CHECK_SIZE_VALUE), inputNumZCubes = 1, - mergedFiles = ClusteringFileStats(6, SKIP_CHECK_SIZE_VALUE), + mergedFiles = ClusteringFileStats(4, SKIP_CHECK_SIZE_VALUE), numOutputZCubes = 1)) assert(metrics.numFilesRemoved === 4) @@ -230,7 +230,7 @@ class IncrementalZCubeClusteringSuite extends QueryTest inputZCubeFiles = ClusteringFileStats(2, SKIP_CHECK_SIZE_VALUE), inputOtherFiles = ClusteringFileStats(4, SKIP_CHECK_SIZE_VALUE), inputNumZCubes = 1, - mergedFiles = ClusteringFileStats(6, SKIP_CHECK_SIZE_VALUE), + mergedFiles = ClusteringFileStats(4, SKIP_CHECK_SIZE_VALUE), numOutputZCubes = 1)) assert(metrics.numFilesRemoved == 4) assert(metrics.numFilesAdded == 2) @@ -266,7 +266,9 @@ class IncrementalZCubeClusteringSuite extends QueryTest actualMetrics = metrics.clusteringStats.get, expectedMetrics = ClusteringStats( inputZCubeFiles = ClusteringFileStats(0, SKIP_CHECK_SIZE_VALUE), - inputOtherFiles = ClusteringFileStats(4, SKIP_CHECK_SIZE_VALUE), + // 8 files: 4 files from previously clustered files with different cluster keys + // and 4 files from newly added 4 un-clustered files. + inputOtherFiles = ClusteringFileStats(8, SKIP_CHECK_SIZE_VALUE), inputNumZCubes = 0, mergedFiles = ClusteringFileStats(4, SKIP_CHECK_SIZE_VALUE), numOutputZCubes = 1)) @@ -281,5 +283,261 @@ class IncrementalZCubeClusteringSuite extends QueryTest } } } + + test("OPTIMIZE FULL - change cluster keys") { + withSQLConf( + SQLConf.MAX_RECORDS_PER_FILE.key -> "2", + // Enable update catalog for verifyClusteringColumns. + DeltaSQLConf.DELTA_UPDATE_CATALOG_ENABLED.key -> "true") { + withClusteredTable( + table = table, + schema = "col1 int, col2 int", + clusterBy = "col1, col2") { + addFiles(table, numFiles = 4) + val files0 = getFiles(table) + assert(files0.size === 4) + // Cluster the table into two ZCUBEs. + runOptimize(table) { metrics => + assert(metrics.clusteringStats.nonEmpty) + validateClusteringMetrics( + actualMetrics = metrics.clusteringStats.get, + expectedMetrics = ClusteringStats( + inputZCubeFiles = ClusteringFileStats(0, SKIP_CHECK_SIZE_VALUE), + inputOtherFiles = ClusteringFileStats(4, SKIP_CHECK_SIZE_VALUE), + inputNumZCubes = 0, + mergedFiles = ClusteringFileStats(4, SKIP_CHECK_SIZE_VALUE), + numOutputZCubes = 1)) + + assert(metrics.numFilesRemoved == 4) + assert(metrics.numFilesAdded == 2) + } + val files1 = getFiles(table) + assert(files1.size === 2) + + addFiles(table, numFiles = 4) + assert(getFiles(table).size == 6) + + // Change the clustering columns and verify files with previous clustering columns + // are not clustered. + sql(s"ALTER TABLE $table CLUSTER BY (col2, col1)") + verifyClusteringColumns(TableIdentifier(table), Seq("col2", "col1")) + + withSQLConf( + // Set an extreme value to make all zcubes unstable. + DeltaSQLConf.DELTA_OPTIMIZE_CLUSTERING_MIN_CUBE_SIZE.key -> Long.MaxValue.toString) { + runOptimize(table) { metrics => + assert(metrics.clusteringStats.nonEmpty) + assert(metrics.numFilesRemoved == 4) + assert(metrics.numFilesAdded == 2) + validateClusteringMetrics( + actualMetrics = metrics.clusteringStats.get, + expectedMetrics = ClusteringStats( + inputZCubeFiles = ClusteringFileStats(0, SKIP_CHECK_SIZE_VALUE), + inputOtherFiles = ClusteringFileStats(6, SKIP_CHECK_SIZE_VALUE), + inputNumZCubes = 0, + mergedFiles = ClusteringFileStats(4, SKIP_CHECK_SIZE_VALUE), + numOutputZCubes = 1)) + } + } + val files2 = getFiles(table) + assert(files2.size === 4) + assert(files2.forall { file => + val zCubeInfo = ZCubeInfo.getForFile(file) + zCubeInfo.nonEmpty + }) + assert(getZCubeIds(table).size == 2) + // validate files clustered to previous clustering columns are not re-clustered. + assert(files2.intersect(files1) === files1) + + // OPTIMIZE FULL should re-cluster previously clustered files. + withSQLConf( + // Force all zcubes stable + DeltaSQLConf.DELTA_OPTIMIZE_CLUSTERING_MIN_CUBE_SIZE.key -> 1.toString) { + runOptimizeFull(table) { metrics => + assert(metrics.clusteringStats.nonEmpty) + // Only files with old cluster keys are rewritten. + assert(metrics.numFilesRemoved == 2) + assert(metrics.numFilesAdded == 2) + + validateClusteringMetrics( + actualMetrics = metrics.clusteringStats.get, + expectedMetrics = ClusteringStats( + inputZCubeFiles = ClusteringFileStats(4, SKIP_CHECK_SIZE_VALUE), + inputOtherFiles = ClusteringFileStats(0, SKIP_CHECK_SIZE_VALUE), + inputNumZCubes = 2, + mergedFiles = ClusteringFileStats(2, SKIP_CHECK_SIZE_VALUE), + numOutputZCubes = 1)) + } + } + // all files have same clustering keys. + assert(getFiles(table).forall { f => + val zCubeInfo = ZCubeInfo.getForFile(f).get + val (_, snapshot) = DeltaLog.forTableWithSnapshot(spark, TableIdentifier(table)) + val clusteringColumns = ClusteringColumnInfo.extractLogicalNames(snapshot) + zCubeInfo.zOrderBy == clusteringColumns + }) + + // Incremental OPTIMIZE to validate no files should be clustered. + withSQLConf( + // Force all zcubes stable + DeltaSQLConf.DELTA_OPTIMIZE_CLUSTERING_MIN_CUBE_SIZE.key -> 1.toString) { + runOptimize(table) { metrics => + assert(metrics.clusteringStats.nonEmpty) + assert(metrics.numFilesRemoved == 0) + validateClusteringMetrics( + actualMetrics = metrics.clusteringStats.get, + expectedMetrics = ClusteringStats( + inputZCubeFiles = ClusteringFileStats(4, SKIP_CHECK_SIZE_VALUE), + inputOtherFiles = ClusteringFileStats(0, SKIP_CHECK_SIZE_VALUE), + inputNumZCubes = 2, + mergedFiles = ClusteringFileStats(0, SKIP_CHECK_SIZE_VALUE), + numOutputZCubes = 0)) + } + } + + // OPTIMIZE FULL again and all clustered files have same clustering columns and + // all ZCUBEs are stable. + withSQLConf( + // Force all zcubes stable + DeltaSQLConf.DELTA_OPTIMIZE_CLUSTERING_MIN_CUBE_SIZE.key -> 1.toString) { + runOptimizeFull(table) { metrics => + assert(metrics.clusteringStats.nonEmpty) + validateClusteringMetrics( + actualMetrics = metrics.clusteringStats.get, + expectedMetrics = ClusteringStats( + inputZCubeFiles = ClusteringFileStats(4, SKIP_CHECK_SIZE_VALUE), + inputOtherFiles = ClusteringFileStats(0, SKIP_CHECK_SIZE_VALUE), + inputNumZCubes = 2, + mergedFiles = ClusteringFileStats(0, SKIP_CHECK_SIZE_VALUE), + numOutputZCubes = 0)) + assert(metrics.numFilesRemoved == 0) + assert(metrics.numFilesAdded == 0) + } + } + } + } + } + + test("OPTIMIZE FULL - change clustering provider") { + withSQLConf( + SQLConf.MAX_RECORDS_PER_FILE.key -> "2", + // Enable update catalog for verifyClusteringColumns. + DeltaSQLConf.DELTA_UPDATE_CATALOG_ENABLED.key -> "true") { + withClusteredTable( + table = table, + schema = "col1 int, col2 int", + clusterBy = "col1, col2") { + addFiles(table, numFiles = 4) + val files0 = getFiles(table) + assert(files0.size === 4) + // Cluster the table into two ZCUBEs. + runOptimize(table) { metrics => + assert(metrics.clusteringStats.nonEmpty) + validateClusteringMetrics( + actualMetrics = metrics.clusteringStats.get, + expectedMetrics = ClusteringStats( + inputZCubeFiles = ClusteringFileStats(0, SKIP_CHECK_SIZE_VALUE), + inputOtherFiles = ClusteringFileStats(4, SKIP_CHECK_SIZE_VALUE), + inputNumZCubes = 0, + mergedFiles = ClusteringFileStats(4, SKIP_CHECK_SIZE_VALUE), + numOutputZCubes = 1)) + + assert(metrics.numFilesRemoved == 4) + assert(metrics.numFilesAdded == 2) + } + var files1 = getFiles(table) + assert(files1.size === 2) + for (f <- files1) { + assert(f.clusteringProvider.contains(ClusteredTableUtils.clusteringProvider)) + } + // Change the clusteringProvider and verify files with different clusteringProvider + // are not clustered. + val (deltaLog, _) = DeltaLog.forTableWithSnapshot(spark, TableIdentifier(table)) + val txn = deltaLog.startTransaction(None) + files1 = files1.map(f => f.copy(clusteringProvider = Some("customProvider"))) + txn.commit(files1.toIndexedSeq, DeltaOperations.ManualUpdate) + files1 = getFiles(table) + assert(files1.size === 2) + for (f <- files1) { + assert(f.clusteringProvider.contains("customProvider")) + } + + addFiles(table, numFiles = 4) + assert(getFiles(table).size == 6) + + withSQLConf( + // Set an extreme value to make all zcubes unstable. + DeltaSQLConf.DELTA_OPTIMIZE_CLUSTERING_MIN_CUBE_SIZE.key -> Long.MaxValue.toString) { + runOptimize(table) { metrics => + assert(metrics.clusteringStats.nonEmpty) + assert(metrics.numFilesRemoved == 4) + assert(metrics.numFilesAdded == 2) + validateClusteringMetrics( + actualMetrics = metrics.clusteringStats.get, + expectedMetrics = ClusteringStats( + inputZCubeFiles = ClusteringFileStats(2, SKIP_CHECK_SIZE_VALUE), + inputOtherFiles = ClusteringFileStats(4, SKIP_CHECK_SIZE_VALUE), + inputNumZCubes = 1, + mergedFiles = ClusteringFileStats(4, SKIP_CHECK_SIZE_VALUE), + numOutputZCubes = 1)) + } + } + val files2 = getFiles(table) + assert(files2.size === 4) + assert(files2.forall { file => + val zCubeInfo = ZCubeInfo.getForFile(file) + zCubeInfo.nonEmpty + }) + assert(getZCubeIds(table).size == 2) + // validate files with different clusteringProvider are not re-clustered. + assert(files2.intersect(files1) === files1) + + // OPTIMIZE FULL should re-cluster previously clustered files. + withSQLConf( + // Force all zcubes stable + DeltaSQLConf.DELTA_OPTIMIZE_CLUSTERING_MIN_CUBE_SIZE.key -> 1.toString) { + runOptimizeFull(table) { metrics => + assert(metrics.clusteringStats.nonEmpty) + // Only files with old cluster keys are rewritten. + assert(metrics.numFilesRemoved == 2) + assert(metrics.numFilesAdded == 2) + + validateClusteringMetrics( + actualMetrics = metrics.clusteringStats.get, + expectedMetrics = ClusteringStats( + inputZCubeFiles = ClusteringFileStats(4, SKIP_CHECK_SIZE_VALUE), + inputOtherFiles = ClusteringFileStats(0, SKIP_CHECK_SIZE_VALUE), + inputNumZCubes = 2, + mergedFiles = ClusteringFileStats(2, SKIP_CHECK_SIZE_VALUE), + numOutputZCubes = 1)) + } + } + // all files have same clustering provider. + assert(getFiles(table).forall { f => + f.clusteringProvider.contains(ClusteredTableUtils.clusteringProvider) + }) + } + } + } + + // Test to validate OPTIMIZE FULL is only applied to a clustered table with non-empty clustering + // columns. + test("OPTIMIZE FULL - error cases") { + withTable(table) { + sql(s"CREATE TABLE $table(col1 INT, col2 INT, col3 INT) using delta") + val e = intercept[DeltaUnsupportedOperationException] { + sql(s"OPTIMIZE $table FULL") + } + checkError(e, "DELTA_OPTIMIZE_FULL_NOT_SUPPORTED") + } + + withClusteredTable(table, "col1 INT, col2 INT, col3 INT", "col1") { + sql(s"ALTER TABLE $table CLUSTER BY NONE") + val e = intercept[DeltaUnsupportedOperationException] { + sql(s"OPTIMIZE $table FULL") + } + checkError(e, "DELTA_OPTIMIZE_FULL_NOT_SUPPORTED") + } + } }