Skip to content

Commit

Permalink
Minor refactor to DeltaErrors and DeltaErrorsSuite
Browse files Browse the repository at this point in the history
GitOrigin-RevId: ceacc3a6239f36e76cc8cf4f3447285bb06fc6a0

CDF + Vacuum tests

This PR adds two tests testing CDF + VACUUM integration.

Closes delta-io#1177

GitOrigin-RevId: f2f7b187cb3cc78c267d378eaf3d0657a56241d9

Adds miscellaneous (e.g. end-to-end workload, and CDCReader) tests for CDF.

Resolves delta-io#1178

GitOrigin-RevId: 5c7da4ff9413d84e73137a80673872065de8267b

Minor refactor to UpdateCommand

GitOrigin-RevId: ab3fcbe0522aa194ac0781730a50112655d5c7ec

 Fixes delta-io#348 Support Dynamic Partition Overwrite

The goal of this PR to to support dynamic partition overwrite mode on writes to delta.

To enable this on a per write add `.option("partitionOverwriteMode", "dynamic")`. It can also be set per sparkSession in the SQL Config using `.config("spark.sql.sources.partitionOverwriteMode", "dynamic")`.

Some limitations of this pullreq:
Dynamic partition overwrite mode in combination with replaceWhere is not supported. If both are set this will result in an error.
The SQL `INSERT OVERWRITE` syntax does not yet support dynamic partition overwrite. For this more changes will be needed to be made to `org.apache.spark.sql.delta.catalog.DeltaTableV2` and related classes.

Fixes delta-io#348
Closes delta-io#371

Signed-off-by: Allison Portis <allison.portis@databricks.com>
GitOrigin-RevId: 5b01e5b04e573dabe91ac2d71991a127617b8038

Add Checkpoint + CDF test.

This PR adds a test to ensure CDC fields are not included in the checkpoint file.

Resolves delta-io#1180

GitOrigin-RevId: d4a7b8bc4d1a79d30806ff18c5b507f6edcd964c
  • Loading branch information
scottsand-db authored and mmengarelli committed Jun 8, 2022
1 parent 3c72c1d commit 6a40006
Show file tree
Hide file tree
Showing 12 changed files with 771 additions and 31 deletions.
21 changes: 6 additions & 15 deletions core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@ package org.apache.spark.sql.delta

// scalastyle:off import.ordering.noEmptyLine
import java.io.{FileNotFoundException, IOException}
import java.net.URI
import java.nio.file.FileAlreadyExistsException
import java.util.ConcurrentModificationException

import org.apache.spark.sql.delta.actions.{CommitInfo, FileAction, Metadata, Protocol}
import org.apache.spark.sql.delta.actions.{CommitInfo, Metadata, Protocol}
import org.apache.spark.sql.delta.catalog.DeltaCatalog
import org.apache.spark.sql.delta.constraints.Constraints
import org.apache.spark.sql.delta.hooks.PostCommitHook
Expand All @@ -38,12 +37,11 @@ import org.apache.spark.{SparkConf, SparkEnv, SparkException}
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connector.catalog.{Identifier, TableChange}
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{DataType, StructField, StructType}


Expand Down Expand Up @@ -106,11 +104,12 @@ trait DocsPath {
/**
* A holder object for Delta errors.
*
*
* IMPORTANT: Any time you add a test that references the docs, add to the Seq defined in
* DeltaErrorsSuite so that the doc links that are generated can be verified to work in
* docs.delta.io
*/
object DeltaErrors
trait DeltaErrorsBase
extends DocsPath
with DeltaLogging {

Expand Down Expand Up @@ -176,7 +175,6 @@ object DeltaErrors
messageParameters = Array(s"$src", s"$dest"))
}


/**
* Thrown when main table data contains columns that are reserved for CDF, such as `_change_type`.
*/
Expand Down Expand Up @@ -990,7 +988,6 @@ object DeltaErrors
)
}


def multipleSourceRowMatchingTargetRowInMergeException(spark: SparkSession): Throwable = {
new DeltaUnsupportedOperationException(
errorClass = "DELTA_MULTIPLE_SOURCE_ROW_MATCHING_TARGET_ROW_IN_MERGE",
Expand Down Expand Up @@ -1029,7 +1026,6 @@ object DeltaErrors
)
}


def nestedFieldNotSupported(operation: String, field: String): Throwable = {
new DeltaAnalysisException(
errorClass = "DELTA_UNSUPPORTED_NESTED_FIELD_IN_OPERATION",
Expand Down Expand Up @@ -1381,7 +1377,6 @@ object DeltaErrors
new AnalysisException(s"Operation $operationName can not be performed on a view")
}


def postCommitHookFailedException(
failedHook: PostCommitHook,
failedOnCommitVersion: Long,
Expand Down Expand Up @@ -1649,7 +1644,6 @@ object DeltaErrors
)
}


def unsupportedTruncateSampleTables: Throwable = {
new DeltaAnalysisException(
errorClass = "DELTA_UNSUPPORTED_TRUNCATE_SAMPLE_TABLES",
Expand Down Expand Up @@ -1806,7 +1800,6 @@ object DeltaErrors
|""".stripMargin)
}


def missingColumnsInInsertInto(column: String): Throwable = {
new DeltaAnalysisException(
errorClass = "DELTA_INSERT_COLUMN_MISMATCH",
Expand Down Expand Up @@ -1943,7 +1936,6 @@ object DeltaErrors
new DeltaAnalysisException(errorClass = "DELTA_UNSET_NON_EXISTENT_PROPERTY", Array(key, table))
}


def identityColumnNotSupported(): Throwable = {
new AnalysisException("IDENTITY column is not supported")
}
Expand All @@ -1969,7 +1961,6 @@ object DeltaErrors
new DeltaIllegalStateException(errorClass = "DELTA_ITERATOR_ALREADY_CLOSED")
}


def activeTransactionAlreadySet(): Throwable = {
new DeltaIllegalStateException(errorClass = "DELTA_ACTIVE_TRANSACTION_ALREADY_SET")
}
Expand Down Expand Up @@ -2159,6 +2150,7 @@ object DeltaErrors
}
}

object DeltaErrors extends DeltaErrorsBase
/** The basic class for all Tahoe commit conflict exceptions. */
abstract class DeltaConcurrentModificationException(message: String)
extends ConcurrentModificationException(message) {
Expand Down Expand Up @@ -2329,7 +2321,6 @@ class MetadataMismatchErrorBuilder {
}
}


class DeltaColumnMappingUnsupportedException(
errorClass: String,
messageParameters: Array[String] = Array.empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.regex.PatternSyntaxException
import scala.util.Try
import scala.util.matching.Regex

import org.apache.spark.sql.delta.DeltaOptions.{DATA_CHANGE_OPTION, MERGE_SCHEMA_OPTION, OVERWRITE_SCHEMA_OPTION}
import org.apache.spark.sql.delta.DeltaOptions.{DATA_CHANGE_OPTION, MERGE_SCHEMA_OPTION, OVERWRITE_SCHEMA_OPTION, PARTITION_OVERWRITE_MODE_OPTION}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSQLConf

Expand Down Expand Up @@ -113,6 +113,20 @@ trait DeltaWriteOptionsImpl extends DeltaOptionParser {
}

validateIdempotentWriteOptions()

/** Whether to only overwrite partitions that have data written into it at runtime. */
def isDynamicPartitionOverwriteMode: Boolean = {
val mode = options.get(PARTITION_OVERWRITE_MODE_OPTION)
.getOrElse(sqlConf.getConf(SQLConf.PARTITION_OVERWRITE_MODE))
val acceptable = Seq("STATIC", "DYNAMIC")
if (!acceptable.exists(mode.equalsIgnoreCase(_))) {
val acceptableStr = acceptable.map("'" + _ + "'").mkString(" or ")
throw DeltaErrors.illegalDeltaOptionException(
PARTITION_OVERWRITE_MODE_OPTION, mode, s"must be ${acceptableStr}"
)
}
mode.equalsIgnoreCase("DYNAMIC")
}
}

trait DeltaReadOptions extends DeltaOptionParser {
Expand Down Expand Up @@ -197,6 +211,7 @@ object DeltaOptions extends DeltaLogging {
val OVERWRITE_SCHEMA_OPTION = "overwriteSchema"
/** An option to specify user-defined metadata in commitInfo */
val USER_METADATA_OPTION = "userMetadata"
val PARTITION_OVERWRITE_MODE_OPTION = "partitionOverwriteMode"

val MAX_FILES_PER_TRIGGER_OPTION = "maxFilesPerTrigger"
val MAX_FILES_PER_TRIGGER_OPTION_DEFAULT = 1000
Expand Down Expand Up @@ -227,6 +242,7 @@ object DeltaOptions extends DeltaLogging {
EXCLUDE_REGEX_OPTION,
OVERWRITE_SCHEMA_OPTION,
USER_METADATA_OPTION,
PARTITION_OVERWRITE_MODE_OPTION,
MAX_FILES_PER_TRIGGER_OPTION,
IGNORE_FILE_DELETION_OPTION,
IGNORE_CHANGES_OPTION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,18 @@ trait OptimisticTransactionImpl extends TransactionalWrite
scan.files
}

/** Returns files within the given partitions. */
def filterFiles(partitions: Set[Map[String, String]]): Seq[AddFile] = {
import org.apache.spark.sql.functions.{array, col}
val partitionValues = partitions.map { partition =>
metadata.physicalPartitionColumns.map(partition).toArray
}
val predicate = array(metadata.partitionColumns.map(col): _*)
.isInCollection(partitionValues)
.expr
filterFiles(Seq(predicate))
}

/** Mark the entire table as tainted by this transaction. */
def readWholeTable(): Unit = {
readPredicates += Literal.TrueLiteral
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ case class UpdateCommand(
true
}.asNondeterministic()
val pathsToRewrite =
withStatusCode("DELTA", s"Finding files to rewrite for UPDATE operation") {
withStatusCode("DELTA", UpdateCommand.FINDING_TOUCHED_FILES_MSG) {
data.filter(new Column(updateCondition))
.filter(updatedRowUdf())
.select(input_file_name())
Expand All @@ -147,7 +147,7 @@ case class UpdateCommand(
Nil
} else {
// Generate the new files containing the updated values
withStatusCode("DELTA", s"Rewriting ${filesToRewrite.size} files for UPDATE operation") {
withStatusCode("DELTA", UpdateCommand.rewritingFilesMsg(filesToRewrite.size)) {
rewriteFiles(sparkSession, txn, tahoeFileIndex.path,
filesToRewrite.map(_.path), nameToAddFile, updateCondition)
}
Expand Down Expand Up @@ -245,6 +245,10 @@ case class UpdateCommand(
object UpdateCommand {
val FILE_NAME_COLUMN = "_input_file_name_"
val CONDITION_COLUMN_NAME = "__condition__"
val FINDING_TOUCHED_FILES_MSG: String = "Finding files to rewrite for UPDATE operation"

def rewritingFilesMsg(numFilesToRewrite: Long): String =
s"Rewriting $numFilesToRewrite files for UPDATE operation"

/**
* Whether or not CDC is enabled on this table and, thus, if we should output CDC data during this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ import org.apache.spark.sql.types.{StringType, StructType}
* In combination with `Overwrite`, a `replaceWhere` option can be used to transactionally
* replace data that matches a predicate.
*
* In combination with `Overwrite` dynamic partition overwrite mode (option `partitionOverwriteMode`
* set to `dynamic`, or in spark conf `spark.sql.sources.partitionOverwriteMode` set to `dynamic`)
* is also supported. However a `replaceWhere` option can not be used while dynamic partition mode
* is enabled.
*
* @param schemaInCatalog The schema created in Catalog. We will use this schema to update metadata
* when it is set (in CTAS code path), and otherwise use schema from `data`.
*/
Expand Down Expand Up @@ -131,6 +136,18 @@ case class WriteIntoDelta(
val replaceOnDataColsEnabled =
sparkSession.conf.get(DeltaSQLConf.REPLACEWHERE_DATACOLUMNS_ENABLED)

val useDynamicPartitionOverwriteMode = {
val useDynamic = txn.metadata.partitionColumns.nonEmpty &&
options.isDynamicPartitionOverwriteMode
options.replaceWhere.foreach { _ =>
if (useDynamic) {
throw new AnalysisException(s"'${DeltaOptions.REPLACE_WHERE_OPTION}' cannot be used" +
s" when '${DeltaOptions.PARTITION_OVERWRITE_MODE_OPTION}' is set to 'DYNAMIC'")
}
}
useDynamic
}

// Validate partition predicates
var containsDataFilters = false
val replaceWhere = options.replaceWhere.flatMap { replace =>
Expand Down Expand Up @@ -217,7 +234,17 @@ case class WriteIntoDelta(
removeFiles(sparkSession, txn, condition))
case (SaveMode.Overwrite, None) =>
val newFiles = txn.writeFiles(data, Some(options))
(newFiles, newFiles.collect { case a: AddFile => a }, txn.filterFiles().map(_.remove))
val addFiles = newFiles.collect { case a: AddFile => a }
val deletedFiles = if (useDynamicPartitionOverwriteMode) {
// with dynamic partition overwrite for any partition that is being written to all
// existing data in that partition will be deleted.
// the selection what to delete is on the next two lines
val updatePartitions = addFiles.map(_.partitionValues).toSet
txn.filterFiles(updatePartitions).map(_.remove)
} else {
txn.filterFiles().map(_.remove)
}
(newFiles, addFiles, deletedFiles)
case _ =>
val newFiles = txn.writeFiles(data, Some(options))
(newFiles, newFiles.collect { case a: AddFile => a }, Nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,23 @@ package org.apache.spark.sql.delta
import java.io.File
import java.net.URI

// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.delta.actions.AddCDCFile
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.storage.LocalLogStore
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.delta.util.FileNames
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataOutputStream, Path, RawLocalFileSystem}
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.util.Progressable

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.test.SharedSparkSession


class CheckpointsSuite extends QueryTest
with SharedSparkSession {
with SharedSparkSession with DeltaSQLCommandTest {

protected override def sparkConf = {
// Set the gs LogStore impl to `LocalLogStore` so that it will work with `FakeGCSFileSystem`.
Expand Down Expand Up @@ -170,6 +173,39 @@ class CheckpointsSuite extends QueryTest
}
}
}

test("checkpoint does not contain CDC field") {
withSQLConf(
DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey -> "true"
) {
withTempDir { tempDir =>
withTempView("src") {
spark.range(10).write.format("delta").save(tempDir.getAbsolutePath)
spark.range(5, 15).createOrReplaceTempView("src")
sql(
s"""
|MERGE INTO delta.`$tempDir` t USING src s ON t.id = s.id
|WHEN MATCHED THEN DELETE
|WHEN NOT MATCHED THEN INSERT *
|""".stripMargin)
checkAnswer(
spark.read.format("delta").load(tempDir.getAbsolutePath),
Seq(0, 1, 2, 3, 4, 10, 11, 12, 13, 14).map { i => Row(i) })

// CDC should exist in the log as seen through getChanges, but it shouldn't be in the
// snapshots and the checkpoint file shouldn't have a CDC column.
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)
assert(deltaLog.getChanges(1).next()._2.exists(_.isInstanceOf[AddCDCFile]))
assert(deltaLog.snapshot.stateDS.collect().forall { sa => sa.cdc == null })
deltaLog.checkpoint()
val checkpointFile = FileNames.checkpointFileSingular(deltaLog.logPath, 1)
val checkpointSchema = spark.read.format("parquet").load(checkpointFile.toString).schema
assert(checkpointSchema.fieldNames.toSeq ==
Seq("txn", "add", "remove", "metaData", "protocol"))
}
}
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,13 @@ import scala.sys.process.Process
// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.delta.DeltaErrors.generateDocsLink
import org.apache.spark.sql.delta.actions.{Action, Protocol, ProtocolDowngradeException}
import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.delta.catalog.DeltaCatalog
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.constraints.CharVarcharConstraint
import org.apache.spark.sql.delta.constraints.Constraints
import org.apache.spark.sql.delta.constraints.Constraints.NotNull
import org.apache.spark.sql.delta.constraints.Invariants
import org.apache.spark.sql.delta.constraints.Invariants.PersistedRule
import org.apache.spark.sql.delta.hooks.PostCommitHook
import org.apache.spark.sql.delta.schema.{DeltaInvariantViolationException, InvariantViolationException, SchemaMergingUtils, SchemaUtils, UnsupportedDataTypeInfo}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.JsonUtils
import io.delta.sql.DeltaSparkSessionExtension
import org.apache.hadoop.fs.Path
import org.json4s.JString
Expand All @@ -54,7 +49,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
import org.apache.spark.sql.types.{CalendarIntervalType, DataTypes, DateType, IntegerType, MetadataBuilder, NullType, StringType, StructField, StructType, TimestampNTZType}
import org.apache.spark.sql.types.{CalendarIntervalType, DataTypes, DateType, IntegerType, StringType, StructField, StructType, TimestampNTZType}

trait DeltaErrorsSuiteBase
extends QueryTest
Expand Down Expand Up @@ -146,8 +141,7 @@ trait DeltaErrorsSuiteBase
testUrls()
}


test("test DeltaErrors OSS methods") {
test("test DeltaErrors methods") {
{
val e = intercept[DeltaIllegalStateException] {
throw DeltaErrors.tableAlreadyContainsCDCColumns(Seq("col1", "col2"))
Expand Down
Loading

0 comments on commit 6a40006

Please sign in to comment.