Skip to content

Commit

Permalink
[Spark] ALTER TABLE SYNC IDENTITY SQL support
Browse files Browse the repository at this point in the history
  • Loading branch information
c27kwan authored and zhipengmao-db committed Jul 22, 2024
1 parent 2c450fe commit 88d27a6
Show file tree
Hide file tree
Showing 8 changed files with 270 additions and 6 deletions.
7 changes: 7 additions & 0 deletions spark/src/main/antlr4/io/delta/sql/parser/DeltaSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ statement
DROP FEATURE featureName=featureNameValue (TRUNCATE HISTORY)? #alterTableDropFeature
| ALTER TABLE table=qualifiedName
(clusterBySpec | CLUSTER BY NONE) #alterTableClusterBy
| ALTER TABLE table=qualifiedName
(ALTER | CHANGE) COLUMN? column=qualifiedName SYNC IDENTITY #alterTableSyncIdentity
| OPTIMIZE (path=STRING | table=qualifiedName)
(WHERE partitionPredicate=predicateToken)?
(zorderSpec)? #optimizeTable
Expand Down Expand Up @@ -236,6 +238,7 @@ nonReserved
| CONVERT | TO | DELTA | PARTITIONED | BY
| DESC | DESCRIBE | LIMIT | DETAIL
| GENERATE | FOR | TABLE | CHECK | EXISTS | OPTIMIZE
| IDENTITY | SYNC | COLUMN | CHANGE
| REORG | APPLY | PURGE | UPGRADE | UNIFORM | ICEBERG_COMPAT_VERSION
| RESTORE | AS | OF
| ZORDER | LEFT_PAREN | RIGHT_PAREN
Expand All @@ -251,9 +254,11 @@ ALTER: 'ALTER';
APPLY: 'APPLY';
AS: 'AS';
BY: 'BY';
CHANGE: 'CHANGE';
CHECK: 'CHECK';
CLONE: 'CLONE';
CLUSTER: 'CLUSTER';
COLUMN: 'COLUMN';
COMMA: ',';
COMMENT: 'COMMENT';
CONSTRAINT: 'CONSTRAINT';
Expand All @@ -274,6 +279,7 @@ GENERATE: 'GENERATE';
HISTORY: 'HISTORY';
HOURS: 'HOURS';
ICEBERG_COMPAT_VERSION: 'ICEBERG_COMPAT_VERSION';
IDENTITY: 'IDENTITY';
IF: 'IF';
INVENTORY: 'INVENTORY';
LEFT_PAREN: '(';
Expand All @@ -296,6 +302,7 @@ RETAIN: 'RETAIN';
RIGHT_PAREN: ')';
RUN: 'RUN';
SHALLOW: 'SHALLOW';
SYNC: 'SYNC';
SYSTEM_TIME: 'SYSTEM_TIME';
SYSTEM_VERSION: 'SYSTEM_VERSION';
TABLE: 'TABLE';
Expand Down
15 changes: 14 additions & 1 deletion spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.analysis.UnresolvedTableImplicits._
import org.apache.spark.sql.catalyst.parser.{CompoundBody, ParseErrorListener, ParseException, ParserInterface, ParserInterfaceShims}
import org.apache.spark.sql.catalyst.parser.ParserUtils.{checkDuplicateClauses, string, withOrigin}
import org.apache.spark.sql.catalyst.plans.logical.{AlterTableAddConstraint, AlterTableDropConstraint, AlterTableDropFeature, CloneTableStatement, LogicalPlan, RestoreTableStatement}
import org.apache.spark.sql.catalyst.plans.logical.{AlterColumnSyncIdentity, AlterTableAddConstraint, AlterTableDropConstraint, AlterTableDropFeature, CloneTableStatement, LogicalPlan, RestoreTableStatement}
import org.apache.spark.sql.catalyst.trees.Origin
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, TableCatalog}
import org.apache.spark.sql.errors.QueryParsingErrors
Expand Down Expand Up @@ -560,6 +560,19 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
ifExists = ctx.EXISTS != null)
}

/**
* `ALTER TABLE ALTER COLUMN SYNC IDENTITY` command.
*/
override def visitAlterTableSyncIdentity(
ctx: AlterTableSyncIdentityContext): LogicalPlan = withOrigin(ctx) {
val verb = if (ctx.CHANGE != null) "CHANGE" else "ALTER"
AlterColumnSyncIdentity(
UnresolvedTable(ctx.table.identifier.asScala.map(_.getText).toSeq,
s"ALTER TABLE ... $verb COLUMN"),
UnresolvedFieldName(visitMultipartIdentifier(ctx.column))
)
}

/**
* A featureNameValue can either be String or an identifier. This function extracts
* the featureNameValue based on whether its a string literal or an identifier.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.analysis.FieldName
import org.apache.spark.sql.connector.catalog.TableChange
import org.apache.spark.sql.connector.catalog.TableChange.ColumnChange

/**
* A `ColumnChange` to model `ALTER TABLE ALTER COLUMN SYNC IDENTITY` command.
*
* @param fieldNames The (potentially nested) column name.
*/
case class SyncIdentity(fieldNames: Array[String]) extends ColumnChange {
require(fieldNames.size == 1, "IDENTITY column cannot be a nested column.")
}

case class AlterColumnSyncIdentity(
table: LogicalPlan,
column: FieldName)
extends AlterTableCommand {
override def changes: Seq[TableChange] = {
require(column.resolved, "FieldName should be resolved before it's converted to TableChange.")
val colName = column.name.toArray
Seq(SyncIdentity(colName))
}

override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
copy(table = newChild)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2453,6 +2453,11 @@ trait DeltaErrorsBase
)
}

def identityColumnAlterNonIdentityColumnError(): Throwable = {
new AnalysisException(
"ALTER TABLE ALTER COLUMN SYNC IDENTITY cannot be called on non IDENTITY columns")
}

def identityColumnInconsistentMetadata(
colName: String,
hasStart: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ object IdentityColumn extends DeltaLogging {
// When table with IDENTITY columns are written into.
val opTypeWrite = "delta.identityColumn.write"

// Return true if `field` is an identity column that allows explicit insert. Caller must ensure
// `isIdentityColumn(field)` is true.
def allowExplicitInsert(field: StructField): Boolean = {
field.metadata.getBoolean(IDENTITY_INFO_ALLOW_EXPLICIT_INSERT)
}

// Return all the IDENTITY columns from `schema`.
def getIdentityColumns(schema: StructType): Seq[StructField] = {
schema.filter(ColumnWithDefaultExprUtils.isIdentityColumn)
Expand Down Expand Up @@ -192,6 +198,47 @@ object IdentityColumn extends DeltaLogging {
)
}
}

// Calculate the sync'ed IDENTITY high water mark based on actual data and returns a
// potentially updated `StructField`.
def syncIdentity(field: StructField, df: DataFrame): StructField = {
// Round `value` to the next value that follows start and step configuration.
def roundToNext(start: Long, step: Long, value: Long): Long = {
if (Math.subtractExact(value, start) % step == 0) {
value
} else {
// start + step * ((value - start) / step + 1)
Math.addExact(
Math.multiplyExact(Math.addExact(Math.subtractExact(value, start) / step, 1), step),
start)
}
}

assert(ColumnWithDefaultExprUtils.isIdentityColumn(field))
// Run a query to get the actual high water mark (max or min value of the IDENTITY column) from
// the actual data.
val info = getIdentityInfo(field)
val positiveStep = info.step > 0
val expr = if (positiveStep) max(field.name) else min(field.name)
val resultRow = df.select(expr).collect().head

if (!resultRow.isNullAt(0)) {
val result = resultRow.getLong(0)
val isBeforeStart = if (positiveStep) result < info.start else result > info.start
val newHighWaterMark = roundToNext(info.start, info.step, result)
if (isBeforeStart || info.highWaterMark.contains(newHighWaterMark)) {
field
} else {
val newMetadata = new MetadataBuilder().withMetadata(field.metadata)
.putLong(IDENTITY_INFO_HIGHWATERMARK, newHighWaterMark)
.build()
field.copy(metadata = newMetadata)
}
} else {
field
}
}

// Return IDENTITY information of column `field`. Caller must ensure `isIdentityColumn(field)`
// is true.
def getIdentityInfo(field: StructField): IdentityInfo = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import scala.collection.mutable
import org.apache.spark.sql.delta.skipping.clustering.ClusteredTableUtils
import org.apache.spark.sql.delta.skipping.clustering.temp.{ClusterBy, ClusterBySpec}
import org.apache.spark.sql.delta.skipping.clustering.temp.{ClusterByTransform => TempClusterByTransform}
import org.apache.spark.sql.delta.{DeltaConfigs, DeltaErrors, DeltaTableUtils}
import org.apache.spark.sql.delta.{DeltaLog, DeltaOptions}
import org.apache.spark.sql.delta.{ColumnWithDefaultExprUtils, DeltaConfigs, DeltaErrors, DeltaTableUtils}
import org.apache.spark.sql.delta.{DeltaLog, DeltaOptions, IdentityColumn}
import org.apache.spark.sql.delta.DeltaTableIdentifier.gluePermissionError
import org.apache.spark.sql.delta.commands._
import org.apache.spark.sql.delta.constraints.{AddConstraint, DropConstraint}
Expand All @@ -46,7 +46,7 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, UnresolvedAttribute, UnresolvedFieldName, UnresolvedFieldPosition}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils, SessionCatalog}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, QualifiedColType}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, QualifiedColType, SyncIdentity}
import org.apache.spark.sql.connector.catalog.{DelegatingCatalogExtension, Identifier, StagedTable, StagingTableCatalog, SupportsWrite, Table, TableCapability, TableCatalog, TableChange, V1Table}
import org.apache.spark.sql.connector.catalog.TableCapability._
import org.apache.spark.sql.connector.catalog.TableChange._
Expand Down Expand Up @@ -718,6 +718,18 @@ class DeltaCatalog extends DelegatingCatalogExtension
val (oldField, pos) = getColumn(field)
columnUpdates(field) = oldField.copy(name = rename.newName()) -> pos

case sync: SyncIdentity =>
syncIdentity = true
val field = sync.fieldNames
val (oldField, pos) = getColumn(field)
if (!ColumnWithDefaultExprUtils.isIdentityColumn(oldField)) {
throw DeltaErrors.identityColumnAlterNonIdentityColumnError()
}
// If the IDENTITY column does not allow explicit insert, high water mark should
// always be sync'ed and this is an no-op.
if (IdentityColumn.allowExplicitInsert(oldField)) {
columnUpdates(field) = oldField.copy() -> pos
}

case updateDefault: UpdateColumnDefaultValue =>
val field = updateDefault.fieldNames()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,13 @@ case class AlterTableChangeColumnDeltaCommand(
verifyColumnChange(sparkSession, oldColumnForVerification, resolver, txn)

val newField = {
if (syncIdentity) {
assert(oldColumn == newColumn)
val df = txn.snapshot.deltaLog.createDataFrame(txn.snapshot, txn.filterFiles())
val field = IdentityColumn.syncIdentity(newColumn, df)
txn.readWholeTable()
field
} else {
// Take the name, comment, nullability and data type from newField
// It's crucial to keep the old column's metadata, which may contain column mapping
// metadata.
Expand All @@ -628,13 +635,26 @@ case class AlterTableChangeColumnDeltaCommand(
case Some(newDefaultValue) => result.withCurrentDefaultValue(newDefaultValue)
case None => result.clearCurrentDefaultValue()
}

val updatedColumnMetadata =
if (!SparkCharVarcharUtils.hasCharVarchar(newColumn.dataType)) {
// Remove the char/varchar property from the metadata that
// indicates that this column is a char/varchar column.
// We construct this throwaway object because
// CharVarcharUtils.cleanAttrMetadata takes an AttributeReference.
val throwAwayAttrRef = AttributeReference(
result.name, result.dataType, nullable = result.nullable, result.metadata)()
CharVarcharUtils.cleanAttrMetadata(throwAwayAttrRef).metadata
} else {
result.metadata
}
result
.copy(
name = newColumn.name,
dataType =
SchemaUtils.changeDataType(oldColumn.dataType, newColumn.dataType, resolver),
nullable = newColumn.nullable)
nullable = newColumn.nullable,
metadata = updatedColumnMetadata)
}
}

// Replace existing field with new field
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta

import org.apache.spark.sql.delta.GeneratedAsIdentityType.GeneratedByDefault
import org.apache.spark.sql.delta.sources.DeltaSourceUtils

import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.types._


/**
* Identity Column test suite for the SYNC IDENTITY command.
*/
trait IdentityColumnSyncSuiteBase
extends IdentityColumnTestUtils {
private val tblName = "identity_test"

/**
* Create and manage a table with a single identity column "id" generated by default and a single
* String "value" column.
*/
private def withSimpleGeneratedByDefaultTable(
startsWith: Long, incrementBy: Long)(f: => Unit): Unit = {
withTable(tblName) {
createTable(
tblName,
Seq(
IdentityColumnSpec(
GeneratedByDefault,
startsWith = Some(startsWith),
incrementBy = Some(incrementBy)),
TestColumnSpec(colName = "value", dataType = StringType)
)
)

f
}
}

test("alter table sync identity delta") {
val starts = Seq(-1, 1)
val steps = Seq(-3, 3)
for (start <- starts; step <- steps) {
withSimpleGeneratedByDefaultTable(start, step) {
// Test empty table.
val oldSchema = DeltaLog.forTable(spark, TableIdentifier(tblName)).snapshot.schema
sql(s"ALTER TABLE $tblName ALTER COLUMN id SYNC IDENTITY")
assert(DeltaLog.forTable(spark, TableIdentifier(tblName)).snapshot.schema == oldSchema)

// Test a series of values that are not all following start and step configurations.
for (i <- start to (start + step * 10)) {
sql(s"INSERT INTO $tblName VALUES($i, 'v')")
sql(s"ALTER TABLE $tblName ALTER COLUMN id SYNC IDENTITY")
val expected = start + (((i - start) + (step - 1)) / step) * step
val schema = DeltaLog.forTable(spark, TableIdentifier(tblName)).snapshot.schema
assert(schema("id").metadata.getLong(DeltaSourceUtils.IDENTITY_INFO_HIGHWATERMARK) ==
expected)
}
}
}
}

test("alter table sync identity overflow") {
withSimpleGeneratedByDefaultTable(startsWith = 1L, incrementBy = 10L) {
sql(s"INSERT INTO $tblName VALUES (${Long.MaxValue}, 'a')")
intercept[ArithmeticException](sql(s"ALTER TABLE $tblName ALTER COLUMN id SYNC IDENTITY"))
}
}


test("alter table sync identity non identity column") {
withTable(tblName) {
createTable(
tblName,
Seq(
TestColumnSpec(colName = "id", dataType = LongType),
TestColumnSpec(colName = "value", dataType = IntegerType)
)
)
val ex = intercept[AnalysisException] {
sql(s"ALTER TABLE $tblName ALTER COLUMN id SYNC IDENTITY")
}
assert(ex.getMessage.contains("ALTER TABLE ALTER COLUMN SYNC IDENTITY cannot be called"))
}
}
}

class IdentityColumnSyncScalaSuite
extends IdentityColumnSyncSuiteBase
with ScalaDDLTestUtils

class IdentityColumnSyncScalaIdColumnMappingSuite
extends IdentityColumnSyncSuiteBase
with ScalaDDLTestUtils
with DeltaColumnMappingEnableIdMode

class IdentityColumnSyncScalaNameColumnMappingSuite
extends IdentityColumnSyncSuiteBase
with ScalaDDLTestUtils
with DeltaColumnMappingEnableNameMode

0 comments on commit 88d27a6

Please sign in to comment.