From 9dc27edba59caa420c7c47cc14797f720c9e5cf1 Mon Sep 17 00:00:00 2001 From: Maksym Dovhal Date: Wed, 26 Jan 2022 20:33:22 +0200 Subject: [PATCH 1/4] Refactoring and optimization of RestoreTableCommand * RestoreTableCommand moved to org.apache.spark.sql.delta.commands package * cache() of filesToRemove DataFame removed (according to https://github.com/delta-io/delta/pull/863#issuecomment-1015672532) * cache() of filesToAdd will be applied only if spark.sql.files.ignoreMissingFiles = false (default value) Signed-off-by: Maksym Dovhal --- .../tables/execution/DeltaTableOperations.scala | 5 ++--- .../delta/{ => commands}/RestoreTableCommand.scala | 12 ++++-------- 2 files changed, 6 insertions(+), 11 deletions(-) rename core/src/main/scala/org/apache/spark/sql/delta/{ => commands}/RestoreTableCommand.scala (94%) diff --git a/core/src/main/scala/io/delta/tables/execution/DeltaTableOperations.scala b/core/src/main/scala/io/delta/tables/execution/DeltaTableOperations.scala index ac1fd45eeb..b0b0d2747a 100644 --- a/core/src/main/scala/io/delta/tables/execution/DeltaTableOperations.scala +++ b/core/src/main/scala/io/delta/tables/execution/DeltaTableOperations.scala @@ -17,9 +17,8 @@ package io.delta.tables.execution import scala.collection.Map - -import org.apache.spark.sql.delta.{DeltaLog, RestoreTableCommand} -import org.apache.spark.sql.delta.commands.{DeltaGenerateCommand, VacuumCommand} +import org.apache.spark.sql.delta.DeltaLog +import org.apache.spark.sql.delta.commands.{DeltaGenerateCommand, RestoreTableCommand, VacuumCommand} import org.apache.spark.sql.delta.util.AnalysisHelper import io.delta.tables.DeltaTable diff --git a/core/src/main/scala/org/apache/spark/sql/delta/RestoreTableCommand.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/RestoreTableCommand.scala similarity index 94% rename from core/src/main/scala/org/apache/spark/sql/delta/RestoreTableCommand.scala rename to core/src/main/scala/org/apache/spark/sql/delta/commands/RestoreTableCommand.scala index be6a5a8f08..12a5d5f95f 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/RestoreTableCommand.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/RestoreTableCommand.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.apache.spark.sql.delta +package org.apache.spark.sql.delta.commands import java.sql.Timestamp @@ -23,12 +23,12 @@ import scala.util.{Success, Try} import org.apache.spark.sql.delta.DeltaErrors.timestampInvalid import org.apache.spark.sql.delta.actions.{AddFile, RemoveFile} -import org.apache.spark.sql.delta.commands.DeltaCommand import org.apache.spark.sql.delta.util.DeltaFileOperations.absolutePath import org.apache.spark.sql.{Dataset, Row, SparkSession} import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.delta.{DeltaErrors, DeltaLog, DeltaOperations, Snapshot} import org.apache.spark.sql.execution.command.LeafRunnableCommand import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.IGNORE_MISSING_FILES @@ -91,9 +91,6 @@ case class RestoreTableCommand( "left_anti") .as[AddFile] .map(_.copy(dataChange = true)) - // To avoid recompute of Dataset with wide transformation by toLocalIterator and - // checkSnapshotFilesAvailability method with spark.sql.files.ignoreMissingFiles=false - .cache() val filesToRemove = latestSnapshotFiles .join( @@ -102,8 +99,6 @@ case class RestoreTableCommand( "left_anti") .as[AddFile] .map(_.removeWithTimestamp()) - // To avoid recompute of Dataset with wide transformation by toLocalIterator - .cache() try { checkSnapshotFilesAvailability(deltaLog, filesToAdd, versionToRestore) @@ -124,7 +119,6 @@ case class RestoreTableCommand( metrics) } finally { filesToAdd.unpersist() - filesToRemove.unpersist() } } @@ -184,6 +178,8 @@ case class RestoreTableCommand( .getConf(IGNORE_MISSING_FILES) if (!ignore) { + // To avoid recompute of files Dataset in calling method + files.cache() val path = deltaLog.dataPath val hadoopConf = spark.sparkContext.broadcast( new SerializableConfiguration(deltaLog.newDeltaHadoopConf())) From 1dcebddeeeb8949d5d63604ec1539435aa176231 Mon Sep 17 00:00:00 2001 From: Maksym Dovhal Date: Fri, 28 Jan 2022 21:55:59 +0200 Subject: [PATCH 2/4] Refactoring to resolve review comments --- .../delta/commands/RestoreTableCommand.scala | 52 +++++++++---------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/commands/RestoreTableCommand.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/RestoreTableCommand.scala index 12a5d5f95f..299bd9ec51 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/commands/RestoreTableCommand.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/RestoreTableCommand.scala @@ -100,8 +100,16 @@ case class RestoreTableCommand( .as[AddFile] .map(_.removeWithTimestamp()) + val notIgnoreMissingFiles = !spark + .sessionState + .conf + .getConf(IGNORE_MISSING_FILES) + try { - checkSnapshotFilesAvailability(deltaLog, filesToAdd, versionToRestore) + if (notIgnoreMissingFiles) { + filesToAdd.cache() // To avoid recompute in commitLarge + checkSnapshotFilesAvailability(deltaLog, filesToAdd, versionToRestore) + } // Commit files, metrics, protocol and metadata to delta log val metrics = computeMetrics(filesToAdd, filesToRemove, snapshotToRestore) @@ -118,7 +126,7 @@ case class RestoreTableCommand( Map.empty, metrics) } finally { - filesToAdd.unpersist() + if (notIgnoreMissingFiles) filesToAdd.unpersist() } } @@ -172,32 +180,24 @@ case class RestoreTableCommand( version: Long): Unit = { implicit val spark: SparkSession = files.sparkSession - val ignore = spark - .sessionState - .conf - .getConf(IGNORE_MISSING_FILES) - - if (!ignore) { - // To avoid recompute of files Dataset in calling method - files.cache() - val path = deltaLog.dataPath - val hadoopConf = spark.sparkContext.broadcast( - new SerializableConfiguration(deltaLog.newDeltaHadoopConf())) - - import spark.implicits._ - - val missedFiles = files - .mapPartitions { files => - val fs = path.getFileSystem(hadoopConf.value.value) - val pathStr = path.toUri.getPath - files.filterNot(f => fs.exists(absolutePath(pathStr, f.path))) - } - .map(_.path) - .head(100) - if (missedFiles.nonEmpty) { - throw DeltaErrors.restoreMissedDataFilesError(missedFiles, version) + val path = deltaLog.dataPath + val hadoopConf = spark.sparkContext.broadcast( + new SerializableConfiguration(deltaLog.newDeltaHadoopConf())) + + import spark.implicits._ + + val missedFiles = files + .mapPartitions { files => + val fs = path.getFileSystem(hadoopConf.value.value) + val pathStr = path.toUri.getPath + files.filterNot(f => fs.exists(absolutePath(pathStr, f.path))) } + .map(_.path) + .head(100) + + if (missedFiles.nonEmpty) { + throw DeltaErrors.restoreMissedDataFilesError(missedFiles, version) } } } From 7f2128efbc309a9134fbb870f268d6e0ee277d54 Mon Sep 17 00:00:00 2001 From: Maksym Dovhal Date: Sun, 30 Jan 2022 17:29:00 +0200 Subject: [PATCH 3/4] Removed cache for filesToAdd. Added better job description for spark UI --- .../delta/commands/RestoreTableCommand.scala | 89 ++++++++++--------- 1 file changed, 47 insertions(+), 42 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/commands/RestoreTableCommand.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/RestoreTableCommand.scala index 299bd9ec51..e93dd2516e 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/commands/RestoreTableCommand.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/RestoreTableCommand.scala @@ -21,10 +21,10 @@ import java.sql.Timestamp import scala.collection.JavaConverters._ import scala.util.{Success, Try} +import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.delta.DeltaErrors.timestampInvalid import org.apache.spark.sql.delta.actions.{AddFile, RemoveFile} import org.apache.spark.sql.delta.util.DeltaFileOperations.absolutePath - import org.apache.spark.sql.{Dataset, Row, SparkSession} import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.util.DateTimeUtils @@ -36,6 +36,8 @@ import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.SerializableConfiguration /** + * :: DeveloperAPI :: + * * Perform restore of delta table to a specified version or timestamp * * Algorithm: @@ -52,6 +54,8 @@ import org.apache.spark.util.SerializableConfiguration * 7) If table was modified in parallel then ignore restore and raise exception. * */ + +@DeveloperApi case class RestoreTableCommand( deltaLog: DeltaLog, version: Option[Long], @@ -100,40 +104,41 @@ case class RestoreTableCommand( .as[AddFile] .map(_.removeWithTimestamp()) - val notIgnoreMissingFiles = !spark + val ignoreMissingFiles = spark .sessionState .conf .getConf(IGNORE_MISSING_FILES) - try { - if (notIgnoreMissingFiles) { - filesToAdd.cache() // To avoid recompute in commitLarge - checkSnapshotFilesAvailability(deltaLog, filesToAdd, versionToRestore) - } - - // Commit files, metrics, protocol and metadata to delta log - val metrics = computeMetrics(filesToAdd, filesToRemove, snapshotToRestore) - val addActions = filesToAdd.toLocalIterator().asScala - val removeActions = filesToRemove.toLocalIterator().asScala - - txn.updateMetadata(snapshotToRestore.metadata) - - commitLarge( - spark, - txn, - addActions ++ removeActions, - DeltaOperations.Restore(version, timestamp), - Map.empty, - metrics) - } finally { - if (notIgnoreMissingFiles) filesToAdd.unpersist() + if (!ignoreMissingFiles) { + checkSnapshotFilesAvailability(deltaLog, filesToAdd, versionToRestore) } - } + val metrics = withDescription("metrics") { + computeMetrics(filesToAdd, filesToRemove, snapshotToRestore) } + val addActions = withDescription("add actions") { + filesToAdd.toLocalIterator().asScala } + val removeActions = withDescription("remove actions") { + filesToRemove.toLocalIterator().asScala } + + txn.updateMetadata(snapshotToRestore.metadata) + // Commit files, metrics, protocol and metadata to delta log + commitLarge( + spark, + txn, + addActions ++ removeActions, + DeltaOperations.Restore(version, timestamp), + Map.empty, + metrics) + } Seq.empty[Row] } } + private def withDescription[T](action: String)(f: => T): T = withStatusCode("Delta:", + s"RestoreTableCommand: $action (table path ${deltaLog.dataPath})") { + f + } + private def parseStringToTs(timestamp: Option[String]): Timestamp = { Try { timestamp.flatMap { tsStr => @@ -177,27 +182,27 @@ case class RestoreTableCommand( private def checkSnapshotFilesAvailability( deltaLog: DeltaLog, files: Dataset[AddFile], - version: Long): Unit = { + version: Long): Unit = withDescription("missing files validation") { - implicit val spark: SparkSession = files.sparkSession + implicit val spark: SparkSession = files.sparkSession - val path = deltaLog.dataPath - val hadoopConf = spark.sparkContext.broadcast( - new SerializableConfiguration(deltaLog.newDeltaHadoopConf())) + val path = deltaLog.dataPath + val hadoopConf = spark.sparkContext.broadcast( + new SerializableConfiguration(deltaLog.newDeltaHadoopConf())) - import spark.implicits._ + import spark.implicits._ - val missedFiles = files - .mapPartitions { files => - val fs = path.getFileSystem(hadoopConf.value.value) - val pathStr = path.toUri.getPath - files.filterNot(f => fs.exists(absolutePath(pathStr, f.path))) - } - .map(_.path) - .head(100) + val missedFiles = files + .mapPartitions { files => + val fs = path.getFileSystem(hadoopConf.value.value) + val pathStr = path.toUri.getPath + files.filterNot(f => fs.exists(absolutePath(pathStr, f.path))) + } + .map(_.path) + .head(100) - if (missedFiles.nonEmpty) { - throw DeltaErrors.restoreMissedDataFilesError(missedFiles, version) - } + if (missedFiles.nonEmpty) { + throw DeltaErrors.restoreMissedDataFilesError(missedFiles, version) + } } } From 3f402b981b122862070d0926a3d83335192ce845 Mon Sep 17 00:00:00 2001 From: Maksym Dovhal Date: Mon, 31 Jan 2022 17:10:51 +0200 Subject: [PATCH 4/4] Fixed review comments --- .../sql/delta/commands/RestoreTableCommand.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/commands/RestoreTableCommand.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/RestoreTableCommand.scala index e93dd2516e..501587caf6 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/commands/RestoreTableCommand.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/RestoreTableCommand.scala @@ -21,7 +21,6 @@ import java.sql.Timestamp import scala.collection.JavaConverters._ import scala.util.{Success, Try} -import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.delta.DeltaErrors.timestampInvalid import org.apache.spark.sql.delta.actions.{AddFile, RemoveFile} import org.apache.spark.sql.delta.util.DeltaFileOperations.absolutePath @@ -36,7 +35,6 @@ import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.SerializableConfiguration /** - * :: DeveloperAPI :: * * Perform restore of delta table to a specified version or timestamp * @@ -55,7 +53,6 @@ import org.apache.spark.util.SerializableConfiguration * */ -@DeveloperApi case class RestoreTableCommand( deltaLog: DeltaLog, version: Option[Long], @@ -114,11 +111,14 @@ case class RestoreTableCommand( } val metrics = withDescription("metrics") { - computeMetrics(filesToAdd, filesToRemove, snapshotToRestore) } + computeMetrics(filesToAdd, filesToRemove, snapshotToRestore) + } val addActions = withDescription("add actions") { - filesToAdd.toLocalIterator().asScala } + filesToAdd.toLocalIterator().asScala + } val removeActions = withDescription("remove actions") { - filesToRemove.toLocalIterator().asScala } + filesToRemove.toLocalIterator().asScala + } txn.updateMetadata(snapshotToRestore.metadata) // Commit files, metrics, protocol and metadata to delta log @@ -134,8 +134,8 @@ case class RestoreTableCommand( } } - private def withDescription[T](action: String)(f: => T): T = withStatusCode("Delta:", - s"RestoreTableCommand: $action (table path ${deltaLog.dataPath})") { + private def withDescription[T](action: String)(f: => T): T = withStatusCode("DELTA", + s"RestoreTableCommand: compute $action (table path ${deltaLog.dataPath})") { f }