Skip to content

Commit

Permalink
[Spark]Add OPTIMIZE FULL history support (#3852)
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->
Add isFull operation peramemter in the commit history for command
OPTIMIZE tb FULL.

## How was this patch tested?
Existing unit tests.

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
No
  • Loading branch information
dabao521 authored Nov 12, 2024
1 parent 4f54313 commit fbdd347
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -684,20 +684,25 @@ object DeltaOperations {
val ZORDER_PARAMETER_KEY = "zOrderBy"
/** parameter key to indicate clustering columns */
val CLUSTERING_PARAMETER_KEY = "clusterBy"
/** parameter key to indicate the operation for `OPTIMIZE tbl FULL` */
val CLUSTERING_IS_FULL_KEY = "isFull"

/** Recorded when optimizing the table. */
case class Optimize(
predicate: Seq[Expression],
zOrderBy: Seq[String] = Seq.empty,
auto: Boolean = false,
clusterBy: Option[Seq[String]] = None
clusterBy: Option[Seq[String]] = None,
isFull: Boolean = false
) extends OptimizeOrReorg(OPTIMIZE_OPERATION_NAME, predicate) {
override val parameters: Map[String, Any] = super.parameters ++ Map(
// When clustering columns are specified, set the zOrderBy key to empty.
ZORDER_PARAMETER_KEY -> JsonUtils.toJson(if (clusterBy.isEmpty) zOrderBy else Seq.empty),
CLUSTERING_PARAMETER_KEY -> JsonUtils.toJson(clusterBy.getOrElse(Seq.empty)),
AUTO_COMPACTION_PARAMETER_KEY -> auto
)
// `isFull` is not relevant for non-clustering tables, so skip it.
.++(clusterBy.filter(_.nonEmpty).map(_ => CLUSTERING_IS_FULL_KEY -> isFull))

override val operationMetrics: Set[String] = DeltaOperationMetrics.OPTIMIZE

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,8 @@ class OptimizeExecutor(
predicate = partitionPredicate,
zOrderBy = zOrderByColumns,
auto = isAutoCompact,
clusterBy = if (isClusteredTable) Option(clusteringColumns).filter(_.nonEmpty) else None)
clusterBy = if (isClusteredTable) Option(clusteringColumns).filter(_.nonEmpty) else None,
isFull = optimizeContext.isFull)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.delta.skipping
import org.apache.spark.sql.delta.skipping.clustering.{ClusteredTableUtils, ClusteringColumn, ClusteringColumnInfo}
import org.apache.spark.sql.delta.skipping.clustering.temp.ClusterBySpec
import org.apache.spark.sql.delta.{DeltaLog, Snapshot}
import org.apache.spark.sql.delta.DeltaOperations
import org.apache.spark.sql.delta.DeltaOperations.{CLUSTERING_PARAMETER_KEY, ZORDER_PARAMETER_KEY}
import org.apache.spark.sql.delta.commands.optimize.OptimizeMetrics
import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsBaseSuite
Expand Down Expand Up @@ -53,10 +54,41 @@ trait ClusteredTableTestUtilsBase
* @param postHook callback triggered with OptimizeMetrics returned by the OPTIMIZE command
*/
def runOptimize(table: String)(postHook: OptimizeMetrics => Unit): Unit = {
// Verify Delta history operation parameters' clusterBy
val isPathBasedTable = table.startsWith("tahoe.") || table.startsWith("delta.")
var (deltaLog, snapshot) = if (isPathBasedTable) {
// Path based table e.g. delta.`path-to-directory` or tahoe.`path-to-directory`. Strip
// 6 characters to extract table path.
DeltaLog.forTableWithSnapshot(spark, table.drop(6).replace("`", ""))
} else {
DeltaLog.forTableWithSnapshot(spark, TableIdentifier(table))
}
val beforeVersion = snapshot.version

postHook(optimizeTable(table).select($"metrics.*").as[OptimizeMetrics].head())
snapshot = deltaLog.update()
val afterVersion = snapshot.version

// Verify Delta history operation parameters' clusterBy
verifyDescribeHistoryOperationParameters(table)
val shouldCheckFullStatus = deltaLog.history.getHistory(Some(1)).headOption.exists { h =>
Seq(DeltaOperations.OPTIMIZE_OPERATION_NAME
).contains(h.operation)
}

// Note: Only expect isFull status when the table has non-empty clustering columns and
// clustering table feature, otherwise the OPTIMIZE will fall back to compaction and
// isFull status will not be relevant anymore.
val expectedOperationParameters = ClusteredTableUtils
.getClusteringColumnsOptional(snapshot)
.filter { cols =>
cols.nonEmpty &&
shouldCheckFullStatus &&
ClusteredTableUtils.isSupported(snapshot.protocol) &&
afterVersion > beforeVersion
}
.map(_ => Map(DeltaOperations.CLUSTERING_IS_FULL_KEY -> false))
.getOrElse(Map.empty)
verifyDescribeHistoryOperationParameters(
table, expectedOperationParameters = expectedOperationParameters)
}

/**
Expand All @@ -69,7 +101,8 @@ trait ClusteredTableTestUtilsBase
postHook(sql(s"OPTIMIZE $table FULL").select($"metrics.*").as[OptimizeMetrics].head())

// Verify Delta history operation parameters' clusterBy
verifyDescribeHistoryOperationParameters(table)
verifyDescribeHistoryOperationParameters(table, expectedOperationParameters = Map(
DeltaOperations.CLUSTERING_IS_FULL_KEY -> true))
}

def verifyClusteringColumnsInDomainMetadata(
Expand All @@ -83,8 +116,8 @@ trait ClusteredTableTestUtilsBase

// Verify the operation parameters of the last history event contains `clusterBy`.
protected def verifyDescribeHistoryOperationParameters(
table: String
): Unit = {
table: String,
expectedOperationParameters: Map[String, Any] = Map.empty): Unit = {
val clusterBySupportedOperations = Set(
"CREATE TABLE",
"REPLACE TABLE",
Expand Down Expand Up @@ -115,7 +148,8 @@ trait ClusteredTableTestUtilsBase
"add the operation to the appropriate case in " +
"verifyDescribeHistoryOperationParameters. " +
s"table: $table, lastOperation: ${lastEvent.operation} " +
s"lastOperationParameters: $lastOperationParameters"
s"lastOperationParameters: $lastOperationParameters " +
s"expectedOperationParameters: $expectedOperationParameters"
try {
assert(assertion, debugMsg)
} catch {
Expand All @@ -140,6 +174,12 @@ trait ClusteredTableTestUtilsBase
doAssert(!lastOperationParameters.contains(CLUSTERING_PARAMETER_KEY))
}

// Validate caller provided operator parameters from the last commit.
for ((operationParameterKey, value) <- expectedOperationParameters) {
// Convert value to string since value is stored as toString in operationParameters.
doAssert(lastOperationParameters(operationParameterKey) === value.toString)
}

// Check clusterBy
lastEvent.operation match {
case "CLUSTER BY" =>
Expand Down

0 comments on commit fbdd347

Please sign in to comment.