Skip to content

Commit

Permalink
[Spark] Support OPTIMIZE tbl FULL for clustered table (#3793)
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->
1. Add new sql syntax OPTIMIZE tbl FULL
2. Implemented OPTIMIZE tbl FULL to re-cluster all data in the table.

## How was this patch tested?

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->
new unit tests added

## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
Yes
Previously clustered table won't re-cluster data that was clustered
against different cluster keys. With OPTIMIZE tbl FULL, they will be
re-clustered against the new keys.
  • Loading branch information
dabao521 authored Oct 30, 2024
1 parent 0c916e0 commit 959765a
Show file tree
Hide file tree
Showing 12 changed files with 392 additions and 43 deletions.
5 changes: 3 additions & 2 deletions spark/src/main/antlr4/io/delta/sql/parser/DeltaSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ statement
(clusterBySpec | CLUSTER BY NONE) #alterTableClusterBy
| ALTER TABLE table=qualifiedName
(ALTER | CHANGE) COLUMN? column=qualifiedName SYNC IDENTITY #alterTableSyncIdentity
| OPTIMIZE (path=STRING | table=qualifiedName)
| OPTIMIZE (path=STRING | table=qualifiedName) FULL?
(WHERE partitionPredicate=predicateToken)?
(zorderSpec)? #optimizeTable
| REORG TABLE table=qualifiedName
Expand Down Expand Up @@ -237,7 +237,7 @@ nonReserved
: VACUUM | USING | INVENTORY | RETAIN | HOURS | DRY | RUN
| CONVERT | TO | DELTA | PARTITIONED | BY
| DESC | DESCRIBE | LIMIT | DETAIL
| GENERATE | FOR | TABLE | CHECK | EXISTS | OPTIMIZE
| GENERATE | FOR | TABLE | CHECK | EXISTS | OPTIMIZE | FULL
| IDENTITY | SYNC | COLUMN | CHANGE
| REORG | APPLY | PURGE | UPGRADE | UNIFORM | ICEBERG_COMPAT_VERSION
| RESTORE | AS | OF
Expand Down Expand Up @@ -275,6 +275,7 @@ EXISTS: 'EXISTS';
FALSE: 'FALSE';
FEATURE: 'FEATURE';
FOR: 'FOR';
FULL: 'FULL';
GENERATE: 'GENERATE';
HISTORY: 'HISTORY';
HOURS: 'HOURS';
Expand Down
6 changes: 6 additions & 0 deletions spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -1963,6 +1963,12 @@
],
"sqlState" : "0AKDC"
},
"DELTA_OPTIMIZE_FULL_NOT_SUPPORTED" : {
"message" : [
"OPTIMIZE FULL is only supported for clustered tables with non-empty clustering columns."
],
"sqlState" : "42601"
},
"DELTA_OVERWRITE_SCHEMA_WITH_DYNAMIC_PARTITION_OVERWRITE" : {
"message" : [
"'overwriteSchema' cannot be used in dynamic partition overwrite mode."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,8 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
OptimizeTableCommand(
Option(ctx.path).map(string),
Option(ctx.table).map(visitTableIdentifier),
Option(ctx.partitionPredicate).map(extractRawText(_)).toSeq)(interleaveBy)
Option(ctx.partitionPredicate).map(extractRawText(_)).toSeq,
DeltaOptimizeContext(isFull = ctx.FULL != null))(interleaveBy)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3456,6 +3456,12 @@ trait DeltaErrorsBase
messageParameters = Array(s"${zOrderBy.map(_.name).mkString(", ")}"))
}

def optimizeFullNotSupportedException(): Throwable = {
new DeltaUnsupportedOperationException(
errorClass = "DELTA_OPTIMIZE_FULL_NOT_SUPPORTED",
messageParameters = Array.empty)
}

def alterClusterByNotOnDeltaTableException(): Throwable = {
new DeltaAnalysisException(
errorClass = "DELTA_ONLY_OPERATION",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,10 @@ object OptimizeTableCommand {
/**
* The `optimize` command implementation for Spark SQL. Example SQL:
* {{{
* OPTIMIZE ('/path/to/dir' | delta.table) [WHERE part = 25];
* OPTIMIZE ('/path/to/dir' | delta.table) [WHERE part = 25] [FULL];
* }}}
*
* Note FULL and WHERE clauses are set exclusively.
*/
case class OptimizeTableCommand(
override val child: LogicalPlan,
Expand All @@ -151,7 +153,8 @@ case class OptimizeTableCommand(
throw DeltaErrors.notADeltaTableException(table.deltaLog.dataPath.toString)
}

if (ClusteredTableUtils.isSupported(snapshot.protocol)) {
val isClusteredTable = ClusteredTableUtils.isSupported(snapshot.protocol)
if (isClusteredTable) {
if (userPartitionPredicates.nonEmpty) {
throw DeltaErrors.clusteringWithPartitionPredicatesException(userPartitionPredicates)
}
Expand All @@ -160,6 +163,11 @@ case class OptimizeTableCommand(
}
}

lazy val clusteringColumns = ClusteringColumnInfo.extractLogicalNames(snapshot)
if (optimizeContext.isFull && (!isClusteredTable || clusteringColumns.isEmpty)) {
throw DeltaErrors.optimizeFullNotSupportedException()
}

val partitionColumns = snapshot.metadata.partitionColumns
// Parse the predicate expression into Catalyst expression and verify only simple filters
// on partition columns are present
Expand Down Expand Up @@ -199,12 +207,14 @@ case class OptimizeTableCommand(
* this threshold will be rewritten by the OPTIMIZE command. If not
* specified, [[DeltaSQLConf.DELTA_OPTIMIZE_MAX_DELETED_ROWS_RATIO]]
* will be used. This parameter must be set to `0` when [[reorg]] is set.
* @param isFull whether OPTIMIZE FULL is run. This is only for clustered tables.
*/
case class DeltaOptimizeContext(
reorg: Option[DeltaReorgOperation] = None,
minFileSize: Option[Long] = None,
maxFileSize: Option[Long] = None,
maxDeletedRowsRatio: Option[Double] = None) {
maxDeletedRowsRatio: Option[Double] = None,
isFull: Boolean = false) {
if (reorg.nonEmpty) {
require(
minFileSize.contains(0L) && maxDeletedRowsRatio.contains(0d),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ object OptimizeTableStrategy {
zOrderBy: Seq[String]): OptimizeTableStrategy = getMode(snapshot, zOrderBy) match {
case OptimizeTableMode.CLUSTERING =>
ClusteringStrategy(
sparkSession, ClusteringColumnInfo.extractLogicalNames(snapshot))
sparkSession, ClusteringColumnInfo.extractLogicalNames(snapshot), optimizeContext)
case OptimizeTableMode.ZORDER => ZOrderStrategy(sparkSession, zOrderBy)
case OptimizeTableMode.COMPACTION =>
CompactionStrategy(sparkSession, optimizeContext)
Expand Down Expand Up @@ -188,7 +188,8 @@ case class ZOrderStrategy(
/** Implements clustering strategy for clustered tables */
case class ClusteringStrategy(
override val sparkSession: SparkSession,
clusteringColumns: Seq[String]) extends OptimizeTableStrategy {
clusteringColumns: Seq[String],
optimizeContext: DeltaOptimizeContext) extends OptimizeTableStrategy {

override val optimizeTableMode: OptimizeTableMode.Value = OptimizeTableMode.CLUSTERING

Expand Down Expand Up @@ -237,39 +238,58 @@ case class ClusteringStrategy(
* clustering. The requirements to pick candidate files are:
*
* 1. Candidate files are either un-clustered (missing clusteringProvider) or the
* clusteringProvider is "liquid".
* 2. Clustered files (clusteringProvider is set) with different clustering columns are skipped.
* When clustering columns are changed, existing clustered data is not re-clustered.
* clusteringProvider is "liquid" when isFull is unset.
* 2. Clustered files with different clustering columns are handled differently based
* on isFull setting: If isFull is unset, existing clustered files with different columns are
* skipped. If isFull is set, all files are considered.
* 3. Files that belong to the partial ZCubes are picked. A ZCube is considered as a partial
* ZCube if its size is smaller than [[DELTA_OPTIMIZE_CLUSTERING_MIN_CUBE_SIZE]].
* 4. If there is only single ZCUBE with all files are clustered and if all clustered files
* belong to that ZCube, all files are filtered out.
*/
private def applyMinZCube(files: Seq[AddFile]): Seq[AddFile] = {
val targetSize = sparkSession.sessionState.conf.getConf(DELTA_OPTIMIZE_CLUSTERING_MIN_CUBE_SIZE)
// Skip files with from different clusteringProviders or files clustered by a different set
// of clustering columns.
val inputFiles = files.iterator.filter { file =>
clusteringStatsCollector.inputStats.updateStats(file)
// Keep all files if isFull is set, otherwise skip files with different clusteringProviders
// or files clustered by a different set of clustering columns.
val (candidateFiles, skippedClusteredFiles) = files.iterator.map { f =>
// Note that updateStats is moved out of Iterator.partition lambda since
// scala2.13 doesn't call the lambda in the order of files which violates
// the updateStats' requirement which requires files are ordered in the
// ZCUBE id (files have been ordered before calling applyMinZCube).
clusteringStatsCollector.inputStats.updateStats(f)
f
}.partition { file =>
val sameOrMissingClusteringProvider =
file.clusteringProvider.forall(_ == ClusteredTableUtils.clusteringProvider)

// If clustered before, remove those with different clustering columns.
val zCubeInfo = ZCubeInfo.getForFile(file)
val unmatchedClusteringColumns = zCubeInfo.exists(_.zOrderBy != clusteringColumns)
sameOrMissingClusteringProvider && !unmatchedClusteringColumns
}.map(AddFileWithNumRecords.createFromFile)
}
// Skip files that belong to a ZCUBE that is larger than target ZCUBE size.
val smallZCubeFiles = ZCube.filterOutLargeZCubes(inputFiles, targetSize)

// Skip smallZCubeFiles if they all belong to a single ZCUBE.
ZCube.filterOutSingleZCubes(smallZCubeFiles).map { file =>
clusteringStatsCollector.outputStats.updateStats(file.addFile)
file.addFile
}.toSeq
// Note that ZCube.filterOutLargeZCubes requires clustered files have
// the same clustering columns, so skippedClusteredFiles are not included.
val smallZCubeFiles = ZCube.filterOutLargeZCubes(
candidateFiles.map(AddFileWithNumRecords.createFromFile), targetSize)

if (optimizeContext.isFull && skippedClusteredFiles.nonEmpty) {
// Clustered files with different clustering columns have to be re-clustered.
val finalFiles = (smallZCubeFiles.map(_.addFile) ++ skippedClusteredFiles).toSeq
finalFiles.map { f =>
clusteringStatsCollector.outputStats.updateStats(f)
f
}
} else {
// Skip smallZCubeFiles if they all belong to a single ZCUBE.
ZCube.filterOutSingleZCubes(smallZCubeFiles).map { file =>
clusteringStatsCollector.outputStats.updateStats(file.addFile)
file.addFile
}.toSeq
}
}

/** Metrics for clustering when [[isClusteredTable]] is true. */
private val clusteringStatsCollector: ClusteringStatsCollector =
ClusteringStatsCollector(clusteringColumns)
ClusteringStatsCollector(clusteringColumns, optimizeContext)
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@ import org.apache.spark.sql.delta.zorder.ZCubeInfo.ZCubeID
* calling updateStats on every new file seen.
* The number of ZCubes, number of files from matching cubes and number of unoptimized files are
* captured here.
*
* @param zOrderBy zOrder or clustering columns.
* @param isFull whether OPTIMIZE FULL is run. This is only for clustered tables.
*/
class ZCubeFileStatsCollector(zOrderBy: Seq[String]) {
class ZCubeFileStatsCollector(zOrderBy: Seq[String], isFull: Boolean) {

/** map that holds the file statistics Map("element" -> (number of files, total file size)) */
private var processedZCube: ZCubeID = _
Expand All @@ -47,7 +50,9 @@ class ZCubeFileStatsCollector(zOrderBy: Seq[String]) {
/** method to update the zCubeFileStats incrementally by file */
def updateStats(file: AddFile): AddFile = {
val zCubeInfo = ZCubeInfo.getForFile(file)
if (zCubeInfo.isDefined && zCubeInfo.get.zOrderBy == zOrderBy) {
// Note that clustered files with different clustering columns are considered candidate
// files when isFull is set.
if (zCubeInfo.isDefined && (isFull || zCubeInfo.get.zOrderBy == zOrderBy)) {
if (processedZCube != zCubeInfo.get.zCubeID) {
processedZCube = zCubeInfo.get.zCubeID
numZCubes += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ package org.apache.spark.sql.delta.commands.optimize
class ZOrderMetrics(zOrderBy: Seq[String]) {

var strategyName: String = _
val inputStats = new ZCubeFileStatsCollector(zOrderBy)
val outputStats = new ZCubeFileStatsCollector(zOrderBy)
val inputStats = new ZCubeFileStatsCollector(zOrderBy, isFull = false)
val outputStats = new ZCubeFileStatsCollector(zOrderBy, isFull = false)
var numOutputCubes = 0

def getZOrderStats(): ZOrderStats = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.apache.spark.sql.delta.skipping.clustering

import org.apache.spark.sql.delta.commands.DeltaOptimizeContext
import org.apache.spark.sql.delta.commands.optimize.ZCubeFileStatsCollector

/**
Expand Down Expand Up @@ -54,10 +55,10 @@ case class ClusteringStats(
/**
* A class help collecting ClusteringStats.
*/
case class ClusteringStatsCollector(zOrderBy: Seq[String]) {
case class ClusteringStatsCollector(zOrderBy: Seq[String], optimizeContext: DeltaOptimizeContext) {

val inputStats = new ZCubeFileStatsCollector(zOrderBy)
val outputStats = new ZCubeFileStatsCollector(zOrderBy)
val inputStats = new ZCubeFileStatsCollector(zOrderBy, optimizeContext.isFull)
val outputStats = new ZCubeFileStatsCollector(zOrderBy, optimizeContext.isFull)
var numOutputZCubes = 0

def getClusteringStats: ClusteringStats = {
Expand Down
38 changes: 33 additions & 5 deletions spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.delta.skipping.clustering.temp.ClusterByTransform
import org.apache.spark.sql.delta.CloneTableSQLTestUtils
import org.apache.spark.sql.delta.DeltaTestUtils.BOOLEAN_DOMAIN
import org.apache.spark.sql.delta.{UnresolvedPathBasedDeltaTable, UnresolvedPathBasedTable}
import org.apache.spark.sql.delta.commands.{DescribeDeltaDetailCommand, DescribeDeltaHistory, OptimizeTableCommand, DeltaReorgTable}
import org.apache.spark.sql.delta.commands.{DeltaOptimizeContext, DescribeDeltaDetailCommand, DescribeDeltaHistory, OptimizeTableCommand, DeltaReorgTable}
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.{TableIdentifier, TimeTravel}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedTable}
Expand Down Expand Up @@ -148,6 +148,31 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper {
assert(parser.parsePlan("OPTIMIZE tbl WHERE part = 1 ZORDER BY (col1, col2.subcol)") ===
OptimizeTableCommand(None, Some(tblId("tbl")), Seq("part = 1"))(
Seq(unresolvedAttr("col1"), unresolvedAttr("col2", "subcol"))))

// Validate OPTIMIZE works correctly with FULL keyword.
parsedCmd = parser.parsePlan("OPTIMIZE tbl FULL")
assert(parsedCmd ===
OptimizeTableCommand(None, Some(tblId("tbl")), Nil, DeltaOptimizeContext(isFull = true))(Nil))
assert(parsedCmd.asInstanceOf[OptimizeTableCommand].child ===
UnresolvedTable(Seq("tbl"), "OPTIMIZE"))

parsedCmd = parser.parsePlan("OPTIMIZE catalog_foo.db.tbl FULL")
assert(parsedCmd === OptimizeTableCommand(
None, Some(tblId("tbl", "db", "catalog_foo")), Nil, DeltaOptimizeContext(isFull = true))(Nil))
assert(parsedCmd.asInstanceOf[OptimizeTableCommand].child ===
UnresolvedTable(Seq("catalog_foo", "db", "tbl"), "OPTIMIZE"))

parsedCmd = parser.parsePlan("OPTIMIZE '/path/to/tbl' FULL")
assert(parsedCmd === OptimizeTableCommand(
Some("/path/to/tbl"), None, Nil, DeltaOptimizeContext(isFull = true))(Nil))
assert(parsedCmd.asInstanceOf[OptimizeTableCommand].child ===
UnresolvedPathBasedDeltaTable("/path/to/tbl", Map.empty, "OPTIMIZE"))

parsedCmd = parser.parsePlan("OPTIMIZE delta.`/path/to/tbl` FULL")
assert(parsedCmd === OptimizeTableCommand(
None, Some(tblId("/path/to/tbl", "delta")), Nil, DeltaOptimizeContext(isFull = true))(Nil))
assert(parsedCmd.asInstanceOf[OptimizeTableCommand].child ===
UnresolvedTable(Seq("delta", "/path/to/tbl"), "OPTIMIZE"))
}

test("OPTIMIZE command new tokens are non-reserved keywords") {
Expand All @@ -161,15 +186,18 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper {
assert(parser.parsePlan("OPTIMIZE zorder") ===
OptimizeTableCommand(None, Some(tblId("zorder")), Nil)(Nil))

assert(parser.parsePlan("OPTIMIZE full") ===
OptimizeTableCommand(None, Some(tblId("full")), Nil)(Nil))

// Use the new keywords in column name
assert(parser.parsePlan("OPTIMIZE tbl WHERE zorder = 1 and optimize = 2") ===
assert(parser.parsePlan("OPTIMIZE tbl WHERE zorder = 1 and optimize = 2 and full = 3") ===
OptimizeTableCommand(None,
Some(tblId("tbl"))
, Seq("zorder = 1 and optimize = 2"))(Nil))
, Seq("zorder = 1 and optimize = 2 and full = 3"))(Nil))

assert(parser.parsePlan("OPTIMIZE tbl ZORDER BY (optimize, zorder)") ===
assert(parser.parsePlan("OPTIMIZE tbl ZORDER BY (optimize, zorder, full)") ===
OptimizeTableCommand(None, Some(tblId("tbl")), Nil)(
Seq(unresolvedAttr("optimize"), unresolvedAttr("zorder"))))
Seq(unresolvedAttr("optimize"), unresolvedAttr("zorder"), unresolvedAttr("full"))))
}

test("DESCRIBE DETAIL command is parsed as expected") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,19 @@ trait ClusteredTableTestUtilsBase
verifyDescribeHistoryOperationParameters(table)
}

/**
* Runs optimize full on the table and calls postHook on the metrics.
*
* @param table the name of table
* @param postHook callback triggered with OptimizeMetrics returned by the OPTIMIZE command
*/
def runOptimizeFull(table: String)(postHook: OptimizeMetrics => Unit): Unit = {
postHook(sql(s"OPTIMIZE $table FULL").select($"metrics.*").as[OptimizeMetrics].head())

// Verify Delta history operation parameters' clusterBy
verifyDescribeHistoryOperationParameters(table)
}

def verifyClusteringColumnsInDomainMetadata(
snapshot: Snapshot,
logicalColumnNames: Seq[String]): Unit = {
Expand Down
Loading

0 comments on commit 959765a

Please sign in to comment.