diff --git a/core/src/main/scala/io/delta/tables/execution/DeltaTableOperations.scala b/core/src/main/scala/io/delta/tables/execution/DeltaTableOperations.scala index ac1fd45eeb..b0b0d2747a 100644 --- a/core/src/main/scala/io/delta/tables/execution/DeltaTableOperations.scala +++ b/core/src/main/scala/io/delta/tables/execution/DeltaTableOperations.scala @@ -17,9 +17,8 @@ package io.delta.tables.execution import scala.collection.Map - -import org.apache.spark.sql.delta.{DeltaLog, RestoreTableCommand} -import org.apache.spark.sql.delta.commands.{DeltaGenerateCommand, VacuumCommand} +import org.apache.spark.sql.delta.DeltaLog +import org.apache.spark.sql.delta.commands.{DeltaGenerateCommand, RestoreTableCommand, VacuumCommand} import org.apache.spark.sql.delta.util.AnalysisHelper import io.delta.tables.DeltaTable diff --git a/core/src/main/scala/org/apache/spark/sql/delta/RestoreTableCommand.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/RestoreTableCommand.scala similarity index 83% rename from core/src/main/scala/org/apache/spark/sql/delta/RestoreTableCommand.scala rename to core/src/main/scala/org/apache/spark/sql/delta/commands/RestoreTableCommand.scala index be6a5a8f08..501587caf6 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/RestoreTableCommand.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/RestoreTableCommand.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.apache.spark.sql.delta +package org.apache.spark.sql.delta.commands import java.sql.Timestamp @@ -23,12 +23,11 @@ import scala.util.{Success, Try} import org.apache.spark.sql.delta.DeltaErrors.timestampInvalid import org.apache.spark.sql.delta.actions.{AddFile, RemoveFile} -import org.apache.spark.sql.delta.commands.DeltaCommand import org.apache.spark.sql.delta.util.DeltaFileOperations.absolutePath - import org.apache.spark.sql.{Dataset, Row, SparkSession} import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.delta.{DeltaErrors, DeltaLog, DeltaOperations, Snapshot} import org.apache.spark.sql.execution.command.LeafRunnableCommand import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.IGNORE_MISSING_FILES @@ -36,6 +35,7 @@ import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.SerializableConfiguration /** + * * Perform restore of delta table to a specified version or timestamp * * Algorithm: @@ -52,6 +52,7 @@ import org.apache.spark.util.SerializableConfiguration * 7) If table was modified in parallel then ignore restore and raise exception. * */ + case class RestoreTableCommand( deltaLog: DeltaLog, version: Option[Long], @@ -91,9 +92,6 @@ case class RestoreTableCommand( "left_anti") .as[AddFile] .map(_.copy(dataChange = true)) - // To avoid recompute of Dataset with wide transformation by toLocalIterator and - // checkSnapshotFilesAvailability method with spark.sql.files.ignoreMissingFiles=false - .cache() val filesToRemove = latestSnapshotFiles .join( @@ -102,36 +100,45 @@ case class RestoreTableCommand( "left_anti") .as[AddFile] .map(_.removeWithTimestamp()) - // To avoid recompute of Dataset with wide transformation by toLocalIterator - .cache() - try { + val ignoreMissingFiles = spark + .sessionState + .conf + .getConf(IGNORE_MISSING_FILES) + + if (!ignoreMissingFiles) { checkSnapshotFilesAvailability(deltaLog, filesToAdd, versionToRestore) + } - // Commit files, metrics, protocol and metadata to delta log - val metrics = computeMetrics(filesToAdd, filesToRemove, snapshotToRestore) - val addActions = filesToAdd.toLocalIterator().asScala - val removeActions = filesToRemove.toLocalIterator().asScala - - txn.updateMetadata(snapshotToRestore.metadata) - - commitLarge( - spark, - txn, - addActions ++ removeActions, - DeltaOperations.Restore(version, timestamp), - Map.empty, - metrics) - } finally { - filesToAdd.unpersist() - filesToRemove.unpersist() + val metrics = withDescription("metrics") { + computeMetrics(filesToAdd, filesToRemove, snapshotToRestore) + } + val addActions = withDescription("add actions") { + filesToAdd.toLocalIterator().asScala + } + val removeActions = withDescription("remove actions") { + filesToRemove.toLocalIterator().asScala } - } + txn.updateMetadata(snapshotToRestore.metadata) + // Commit files, metrics, protocol and metadata to delta log + commitLarge( + spark, + txn, + addActions ++ removeActions, + DeltaOperations.Restore(version, timestamp), + Map.empty, + metrics) + } Seq.empty[Row] } } + private def withDescription[T](action: String)(f: => T): T = withStatusCode("DELTA", + s"RestoreTableCommand: compute $action (table path ${deltaLog.dataPath})") { + f + } + private def parseStringToTs(timestamp: Option[String]): Timestamp = { Try { timestamp.flatMap { tsStr => @@ -175,15 +182,10 @@ case class RestoreTableCommand( private def checkSnapshotFilesAvailability( deltaLog: DeltaLog, files: Dataset[AddFile], - version: Long): Unit = { + version: Long): Unit = withDescription("missing files validation") { - implicit val spark: SparkSession = files.sparkSession - val ignore = spark - .sessionState - .conf - .getConf(IGNORE_MISSING_FILES) + implicit val spark: SparkSession = files.sparkSession - if (!ignore) { val path = deltaLog.dataPath val hadoopConf = spark.sparkContext.broadcast( new SerializableConfiguration(deltaLog.newDeltaHadoopConf())) @@ -202,6 +204,5 @@ case class RestoreTableCommand( if (missedFiles.nonEmpty) { throw DeltaErrors.restoreMissedDataFilesError(missedFiles, version) } - } } }