From af4f37c94ce47c28e3c324499463e032b68d454d Mon Sep 17 00:00:00 2001 From: WweiL Date: Thu, 5 Dec 2024 12:03:32 +0900 Subject: [PATCH] [SPARK-50339][SPARK-50360][SS] Enable changelog to store lineage information ### What changes were proposed in this pull request? Break down https://github.com/apache/spark/pull/48355 into smaller PRs. ## Changelog Reader / Writer We purpose to save the lineage to the first line of the changelog files. For changelog reader, there is an abstract function `readLineage` created. In `RocksDBCheckpointManager.getChangelogReader` function, the `readLineage` will be called right after the initialization of the changelog reader to update the file pointer to after the lineage. Subsequent `getNext` function won't be affecter because of this. For changelog writer, there is an abstract function `writeLineage` that writes the lineage. This function will be called before any actual changelog data is written in `RocksDB.load()`. The lineage is stored as json. ### Why are the changes needed? Continue development of SPARK-49374 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit test ### Was this patch authored or co-authored using generative AI tooling? No Closes #48880 from WweiL/changelog. Authored-by: WweiL Signed-off-by: Jungtaek Lim --- .../resources/error/error-conditions.json | 2 +- .../execution/streaming/state/RocksDB.scala | 2 +- .../streaming/state/RocksDBFileManager.scala | 73 +++-- .../streaming/state/StateStoreChangelog.scala | 243 +++++++++++++- ...ateDataSourceTransformWithStateSuite.scala | 4 +- .../streaming/state/RocksDBSuite.scala | 304 +++++++++++++++++- 6 files changed, 565 insertions(+), 63 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 63c4d18c99de9..b18db93f6291a 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -275,7 +275,7 @@ }, "INVALID_CHANGE_LOG_READER_VERSION" : { "message" : [ - "The change log reader version cannot be ." + "The change log reader version cannot be . The checkpoint probably is from a future Spark version, please upgrade your Spark." ] }, "INVALID_CHANGE_LOG_WRITER_VERSION" : { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index f8e9885cef14e..709197cd56527 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -440,7 +440,7 @@ class RocksDB( log"${MDC(LogKeys.VERSION_NUM, v)}") var changelogReader: StateStoreChangelogReader = null try { - changelogReader = fileManager.getChangelogReader(v, useColumnFamilies) + changelogReader = fileManager.getChangelogReader(v) changelogReader.foreach { case (recordType, key, value) => recordType match { case RecordType.PUT_RECORD => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index 6b13ff31c9d50..483e4a32cd85b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -153,58 +153,73 @@ class RocksDBFileManager( @volatile private var rootDirChecked: Boolean = false private val versionToRocksDBFiles = new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]] - private def getChangelogVersion(useColumnFamilies: Boolean): Short = { - val changelogVersion: Short = if (useColumnFamilies) { - 2 - } else { - 1 + /** + * Get the changelog version based on rocksDB features. + * @return the version of changelog + */ + private def getChangelogWriterVersion( + useColumnFamilies: Boolean, + stateStoreCheckpointIdEnabled: Boolean): Short = { + (useColumnFamilies, stateStoreCheckpointIdEnabled) match { + case (false, false) => 1 + case (true, false) => 2 + case (false, true) => 3 + case _ => 4 } - changelogVersion } def getChangeLogWriter( version: Long, - useColumnFamilies: Boolean = false): StateStoreChangelogWriter = { - val changelogFile = dfsChangelogFile(version) + useColumnFamilies: Boolean = false, + checkpointUniqueId: Option[String] = None, + stateStoreCheckpointIdLineage: Option[Array[LineageItem]] = None + ): StateStoreChangelogWriter = { + val changelogFile = dfsChangelogFile(version, checkpointUniqueId) if (!rootDirChecked) { val rootDir = new Path(dfsRootDir) if (!fm.exists(rootDir)) fm.mkdirs(rootDir) rootDirChecked = true } - val changelogVersion = getChangelogVersion(useColumnFamilies) + val enableStateStoreCheckpointIds = checkpointUniqueId.isDefined + val changelogVersion = getChangelogWriterVersion( + useColumnFamilies, enableStateStoreCheckpointIds) val changelogWriter = changelogVersion match { case 1 => new StateStoreChangelogWriterV1(fm, changelogFile, codec) case 2 => new StateStoreChangelogWriterV2(fm, changelogFile, codec) + case 3 => + assert(enableStateStoreCheckpointIds && stateStoreCheckpointIdLineage.isDefined, + "StateStoreChangelogWriterV3 should only be initialized when " + + "state store checkpoint unique id is enabled") + new StateStoreChangelogWriterV3(fm, changelogFile, codec, stateStoreCheckpointIdLineage.get) + case 4 => + assert(enableStateStoreCheckpointIds && stateStoreCheckpointIdLineage.isDefined, + "StateStoreChangelogWriterV4 should only be initialized when " + + "state store checkpoint unique id is enabled") + new StateStoreChangelogWriterV4(fm, changelogFile, codec, stateStoreCheckpointIdLineage.get) case _ => throw QueryExecutionErrors.invalidChangeLogWriterVersion(changelogVersion) } + + logInfo(log"Loaded change log reader version " + + log"${MDC(LogKeys.FILE_VERSION, changelogWriter.version)}") + changelogWriter } // Get the changelog file at version def getChangelogReader( version: Long, - useColumnFamilies: Boolean = false): StateStoreChangelogReader = { - val changelogFile = dfsChangelogFile(version) - - // Note that ideally we should get the version for the reader from the - // changelog itself. However, since we don't record this for v1, we need to - // rely on external arguments to make this call today. Within the reader, we verify - // for the correctness of the decided/expected version. We might revisit this pattern - // as we add more changelog versions in the future. - val changelogVersion = getChangelogVersion(useColumnFamilies) - val changelogReader = changelogVersion match { - case 1 => - new StateStoreChangelogReaderV1(fm, changelogFile, codec) - case 2 => - new StateStoreChangelogReaderV2(fm, changelogFile, codec) - case _ => - throw QueryExecutionErrors.invalidChangeLogReaderVersion(changelogVersion) - } - changelogReader + checkpointUniqueId: Option[String] = None): StateStoreChangelogReader = { + val changelogFile = dfsChangelogFile(version, checkpointUniqueId) + val reader = new StateStoreChangelogReaderFactory(fm, changelogFile, codec) + .constructChangelogReader() + + logInfo(log"Loaded change log reader version ${MDC(LogKeys.FILE_VERSION, reader.version)}") + + reader } /** @@ -777,7 +792,9 @@ class RocksDBFileManager( private def dfsBatchZipFile(version: Long): Path = new Path(s"$dfsRootDir/$version.zip") // We use changelog suffix intentionally so that we can tell the difference from changelog file of // HDFSBackedStateStore which is named version.delta. - private def dfsChangelogFile(version: Long): Path = new Path(s"$dfsRootDir/$version.changelog") + private def dfsChangelogFile(version: Long, checkpointUniqueId: Option[String] = None): Path = + checkpointUniqueId.map(id => new Path(s"$dfsRootDir/${version}_$id.changelog")) + .getOrElse(new Path(s"$dfsRootDir/$version.changelog")) private def localMetadataFile(parentDir: File): File = new File(parentDir, "metadata") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala index 203af9d10217e..f6787a37bc80d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala @@ -24,6 +24,8 @@ import scala.util.control.NonFatal import com.google.common.io.ByteStreams import org.apache.commons.io.IOUtils import org.apache.hadoop.fs.{FSError, Path} +import org.json4s._ +import org.json4s.jackson.Serialization import org.apache.spark.internal.{Logging, MDC} import org.apache.spark.internal.LogKeys._ @@ -78,6 +80,14 @@ object RecordType extends Enumeration { } } +/** + * Class for lineage item for checkpoint format V2. + */ +case class LineageItem( + version: Long, + checkpointUniqueId: String +) + /** * Base class for state store changelog writer * @param fm - checkpoint file manager used to manage streaming query checkpoint @@ -89,18 +99,27 @@ abstract class StateStoreChangelogWriter( file: Path, compressionCodec: CompressionCodec) extends Logging { + implicit val formats: Formats = DefaultFormats + private def compressStream(outputStream: DataOutputStream): DataOutputStream = { val compressed = compressionCodec.compressedOutputStream(outputStream) new DataOutputStream(compressed) } + protected var backingFileStream: CancellableFSDataOutputStream = + fm.createAtomic(file, overwriteIfPossible = true) + protected var compressedStream: DataOutputStream = compressStream(backingFileStream) + protected def writeVersion(): Unit = { compressedStream.writeUTF(s"v${version}") } - protected var backingFileStream: CancellableFSDataOutputStream = - fm.createAtomic(file, overwriteIfPossible = true) - protected var compressedStream: DataOutputStream = compressStream(backingFileStream) + protected def writeLineage(stateStoreCheckpointIdLineage: Array[LineageItem]): Unit = { + assert(version >= 3, + "writeLineage should only be invoked with state store checkpoint id enabled (version >= 3)") + val lineageStr = Serialization.write(stateStoreCheckpointIdLineage) + compressedStream.writeUTF(lineageStr) + } def version: Short @@ -115,9 +134,9 @@ abstract class StateStoreChangelogWriter( if (backingFileStream != null) backingFileStream.cancel() if (compressedStream != null) IOUtils.closeQuietly(compressedStream) } catch { - // Closing the compressedStream causes the stream to write/flush flush data into the + // Closing the compressedStream causes the stream to write/flush data into the // rawStream. Since the rawStream is already closed, there may be errors. - // Usually its an IOException. However, Hadoop's RawLocalFileSystem wraps + // Usually it's an IOException. However, Hadoop's RawLocalFileSystem wraps // IOException into FSError. case e: FSError if e.getCause.isInstanceOf[IOException] => case NonFatal(ex) => @@ -152,15 +171,15 @@ class StateStoreChangelogWriterV1( override def put(key: Array[Byte], value: Array[Byte]): Unit = { assert(compressedStream != null) - compressedStream.writeInt(key.size) + compressedStream.writeInt(key.length) compressedStream.write(key) - compressedStream.writeInt(value.size) + compressedStream.writeInt(value.length) compressedStream.write(value) } override def delete(key: Array[Byte]): Unit = { assert(compressedStream != null) - compressedStream.writeInt(key.size) + compressedStream.writeInt(key.length) compressedStream.write(key) // -1 in the value field means record deletion. compressedStream.writeInt(-1) @@ -206,7 +225,7 @@ class StateStoreChangelogWriterV2( override def version: Short = 2 - // append the version field to the changelog file starting from version 2 + // append the version field to the changelog file writeVersion() override def put(key: Array[Byte], value: Array[Byte]): Unit = { @@ -216,7 +235,7 @@ class StateStoreChangelogWriterV2( override def delete(key: Array[Byte]): Unit = { assert(compressedStream != null) compressedStream.write(RecordType.getRecordTypeAsByte(RecordType.DELETE_RECORD)) - compressedStream.writeInt(key.size) + compressedStream.writeInt(key.length) compressedStream.write(key) // -1 in the value field means record deletion. compressedStream.writeInt(-1) @@ -232,9 +251,9 @@ class StateStoreChangelogWriterV2( assert(recordType == RecordType.PUT_RECORD || recordType == RecordType.MERGE_RECORD) assert(compressedStream != null) compressedStream.write(RecordType.getRecordTypeAsByte(recordType)) - compressedStream.writeInt(key.size) + compressedStream.writeInt(key.length) compressedStream.write(key) - compressedStream.writeInt(value.size) + compressedStream.writeInt(value.length) compressedStream.write(value) } @@ -255,6 +274,127 @@ class StateStoreChangelogWriterV2( } } +/** + * Write changes to the key value state store instance to a changelog file. + * There are 2 types of records, put and delete. + * A put record is written as: | key length | key content | value length | value content | + * A delete record is written as: | key length | key content | -1 | + * Write an Int -1 to signal the end of file. + * The overall changelog format is: | put record | delete record | ... | put record | -1 | + * V3 is a extension of V1 for writing changelogs with version + * in the first line and lineage in the second line. + */ +class StateStoreChangelogWriterV3( + fm: CheckpointFileManager, + file: Path, + compressionCodec: CompressionCodec, + stateStoreCheckpointIdLineage: Array[LineageItem]) + extends StateStoreChangelogWriterV1(fm, file, compressionCodec) { + + override def version: Short = 3 + + // append the version field to the changelog file + writeVersion() + + // Also write lineage information to the changelog, it should appear + // in the second line for v3 because the first line is the version + writeLineage(stateStoreCheckpointIdLineage) +} + +/** + * Write changes to the key value state store instance to a changelog file. + * There are 3 types of data records, put, merge and delete. + * A put record or merge record is written as: | record type | key length + * | key content | value length | value content | -1 | + * A delete record is written as: | record type | key length | key content | -1 + * Write an EOF_RECORD to signal the end of file. + * The overall changelog format is: version | put record | delete record + * | ... | put record | eof record | + * V4 is a extension of V2 for writing changelogs with version + * in the first line and lineage in the second line. + */ +class StateStoreChangelogWriterV4( + fm: CheckpointFileManager, + file: Path, + compressionCodec: CompressionCodec, + stateStoreCheckpointIdLineage: Array[LineageItem]) + extends StateStoreChangelogWriterV2(fm, file, compressionCodec) { + + override def version: Short = 4 + + // Also write lineage information to the changelog, it should appear + // in the second line for v4 because the first line is the version + writeLineage(stateStoreCheckpointIdLineage) +} + +/** + * A factory class for constructing state store readers by reading the first line + * of the change log file, which stores the version. + * Note that for changelog version 1, there is no version written. + * @param fm - checkpoint file manager used to manage streaming query checkpoint + * @param fileToRead - name of file to use to read changelog + * @param compressionCodec - de-compression method using for reading changelog file + */ +class StateStoreChangelogReaderFactory( + fm: CheckpointFileManager, + fileToRead: Path, + compressionCodec: CompressionCodec) extends Logging { + + private def decompressStream(inputStream: DataInputStream): DataInputStream = { + val compressed = compressionCodec.compressedInputStream(inputStream) + new DataInputStream(compressed) + } + + private lazy val sourceStream = try { + fm.open(fileToRead) + } catch { + case f: FileNotFoundException => + throw QueryExecutionErrors.failedToReadStreamingStateFileError(fileToRead, f) + } + protected val input: DataInputStream = decompressStream(sourceStream) + + private def readVersion(): Short = { + try { + val versionStr = input.readUTF() + // Versions in the first line are prefixed with "v", e.g. "v2" + // Since there is no version written for version 1, + // return 1 if first line doesn't start with "v" + if (!versionStr.startsWith("v")) { + 1 + } else { + versionStr.stripPrefix("v").toShort + } + } catch { + // When there is no record being written in the changelog file in V1, + // the file contains a single int -1 meaning EOF, then the above readUTF() + // throws with EOFException and we return version 1. + case _: java.io.EOFException => 1 + } + } + + /** + * Construct the change log reader based on the version stored in changelog file + * @return StateStoreChangelogReader + */ + def constructChangelogReader(): StateStoreChangelogReader = { + var reader: StateStoreChangelogReader = null + try { + reader = readVersion() match { + case 1 => new StateStoreChangelogReaderV1(fm, fileToRead, compressionCodec) + case 2 => new StateStoreChangelogReaderV2(fm, fileToRead, compressionCodec) + case 3 => new StateStoreChangelogReaderV3(fm, fileToRead, compressionCodec) + case 4 => new StateStoreChangelogReaderV4(fm, fileToRead, compressionCodec) + case version => throw QueryExecutionErrors.invalidChangeLogReaderVersion(version) + } + } finally { + if (input != null) { + input.close() + } + } + reader + } +} + /** * Base class for state store changelog reader * @param fm - checkpoint file manager used to manage streaming query checkpoint @@ -267,12 +407,14 @@ abstract class StateStoreChangelogReader( compressionCodec: CompressionCodec) extends NextIterator[(RecordType.Value, Array[Byte], Array[Byte])] with Logging { + implicit val formats: Formats = DefaultFormats + private def decompressStream(inputStream: DataInputStream): DataInputStream = { val compressed = compressionCodec.compressedInputStream(inputStream) new DataInputStream(compressed) } - private val sourceStream = try { + private lazy val sourceStream = try { fm.open(fileToRead) } catch { case f: FileNotFoundException => @@ -280,6 +422,26 @@ abstract class StateStoreChangelogReader( } protected val input: DataInputStream = decompressStream(sourceStream) + // This function is valid only when called upon initialization, + // because version is written in the first line only for version >= 2. + protected def readVersion(): String = input.readUTF() + + protected def verifyVersion(): Unit = { + // ensure that the version read is correct, also updates file position + val changelogVersionStr = readVersion() + assert(changelogVersionStr == s"v${version}", + s"Changelog version mismatch: $changelogVersionStr != v${version}") + } + + private def readLineage(): Array[LineageItem] = { + assert(version >= 3, + "readLineage should only be invoked with state store checkpoint id enabled (version >= 3)") + val lineageStr = input.readUTF() + Serialization.read[Array[LineageItem]](lineageStr) + } + + lazy val lineage: Array[LineageItem] = readLineage() + def version: Short override protected def close(): Unit = { if (input != null) input.close() } @@ -352,10 +514,7 @@ class StateStoreChangelogReaderV2( override def version: Short = 2 - // ensure that the version read is v2 - val changelogVersionStr = input.readUTF() - assert(changelogVersionStr == "v2", - s"Changelog version mismatch: $changelogVersionStr != v2") + verifyVersion() override def getNext(): (RecordType.Value, Array[Byte], Array[Byte]) = { val recordType = RecordType.getRecordTypeFromByte(input.readByte()) @@ -388,6 +547,56 @@ class StateStoreChangelogReaderV2( } } +/** + * Read an iterator of change record from the changelog file. + * A record is represented by tuple(recordType: RecordType.Value, + * key: Array[Byte], value: Array[Byte]) + * A put record is returned as a tuple(recordType, key, value) + * A delete record is return as a tuple(recordType, key, null) + * V3 is a extension of V1 for reading changelogs with version + * in the first line and lineage in the second line. + */ +class StateStoreChangelogReaderV3( + fm: CheckpointFileManager, + fileToRead: Path, + compressionCodec: CompressionCodec) + extends StateStoreChangelogReaderV1(fm, fileToRead, compressionCodec) { + + override def version: Short = 3 + + verifyVersion() + + // If the changelogFile is written when state store checkpoint unique id is enabled + // the first line would be the version and the second line would be the lineage. + // We should update the file position by reading from the lineage during + // the reader initialization. + lineage +} + +/** + * Read an iterator of change record from the changelog file. + * A record is represented by tuple(recordType: RecordType.Value, + * key: Array[Byte], value: Array[Byte]) + * A put or merge record is returned as a tuple(recordType, key, value) + * A delete record is return as a tuple(recordType, key, null) + * V4 is a extension of V2 for reading changelogs with version + * in the first line and lineage in the second line. + */ +class StateStoreChangelogReaderV4( + fm: CheckpointFileManager, + fileToRead: Path, + compressionCodec: CompressionCodec) + extends StateStoreChangelogReaderV2(fm, fileToRead, compressionCodec) { + + override def version: Short = 4 + + // If the changelogFile is written when state store checkpoint unique id is enabled + // the first line would be the version and the second line would be the lineage. + // We should update the file position by reading from the lineage during + // the reader initialization. + lineage +} + /** * Base class representing a iterator that iterates over a range of changelog files in a state * store. In each iteration, it will return a tuple of (changeType: [[RecordType]], diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTransformWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTransformWithStateSuite.scala index af64f563cf7b0..1b63180171be8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTransformWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTransformWithStateSuite.scala @@ -1075,7 +1075,7 @@ class StateDataSourceTransformWithStateSuite extends StateStoreMetricsTest // Read the changelog for one of the partitions at version 3 and // ensure that we have two entries // For this test - keys 9 and 12 are written at version 3 for partition 4 - val changelogReader = fileManager.getChangelogReader(3, true) + val changelogReader = fileManager.getChangelogReader(3) val entries = changelogReader.toSeq assert(entries.size == 2) val retainEntry = entries.head @@ -1091,7 +1091,7 @@ class StateDataSourceTransformWithStateSuite extends StateStoreMetricsTest // Ensure that we have only one entry in the changelog for version 3 // For this test - key 9 is retained and key 12 is deleted - val changelogReader1 = fileManager.getChangelogReader(3, true) + val changelogReader1 = fileManager.getChangelogReader(3) val entries1 = changelogReader1.toSeq assert(entries1.size == 1) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 61ca8e7c32f61..2b48bc2e501bd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -29,14 +29,17 @@ import scala.util.Random import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FSDataInputStream, Path} import org.rocksdb.CompressionType import org.scalactic.source.Position +import org.scalatest.PrivateMethodTester import org.scalatest.Tag import org.apache.spark.{SparkConf, SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.io.CompressionCodec import org.apache.spark.sql.catalyst.util.quietly -import org.apache.spark.sql.execution.streaming.{CreateAtomicTestManager, FileSystemBasedCheckpointFileManager} +import org.apache.spark.sql.execution.streaming.{CheckpointFileManager, CreateAtomicTestManager, FileContextBasedCheckpointFileManager, FileSystemBasedCheckpointFileManager} import org.apache.spark.sql.execution.streaming.CheckpointFileManager.{CancellableFSDataOutputStream, RenameBasedFSDataOutputStream} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS @@ -45,6 +48,7 @@ import org.apache.spark.tags.SlowSQLTest import org.apache.spark.util.{ThreadUtils, Utils} import org.apache.spark.util.ArrayImplicits._ + class NoOverwriteFileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration) extends FileSystemBasedCheckpointFileManager(path, hadoopConf) { @@ -62,6 +66,17 @@ class NoOverwriteFileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Co } } +class TestStateStoreChangelogWriterV101( + fm: CheckpointFileManager, + file: Path, + compressionCodec: CompressionCodec) + extends StateStoreChangelogWriterV1(fm, file, compressionCodec) { + + override def version: Short = 101 + + writeVersion() +} + trait RocksDBStateStoreChangelogCheckpointingTestUtil { val rocksdbChangelogCheckpointingConfKey: String = RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" @@ -177,14 +192,82 @@ trait AlsoTestWithChangelogCheckpointingEnabled } } +class OpenNumCountedTestInputStream(in: InputStream) extends FSDataInputStream(in) { + import OpenNumCountedTestInputStream._ + + addOpenStreams(this) + + override def close(): Unit = { + removeOpenStream(this) + super.close() + } +} + +class OpenStreamCountedTestFileManager(path: Path, hadoopConf: Configuration) + extends FileContextBasedCheckpointFileManager(path, hadoopConf) { + + override def open(path: Path): FSDataInputStream = { + val stream = new OpenNumCountedTestInputStream(super.open(path)) + stream + } +} + +object OpenNumCountedTestInputStream extends Logging { + private val openStreams = mutable.Map.empty[FSDataInputStream, Throwable] + + def addOpenStreams(stream: FSDataInputStream): Unit = openStreams.synchronized { + openStreams.put(stream, new Throwable()) + } + + def removeOpenStream(stream: FSDataInputStream): Unit = openStreams.synchronized { + openStreams.remove(stream) + } + + def clearOpenStreams(): Unit = openStreams.synchronized { + openStreams.clear() + } + + def assertNoOpenStreams(): Unit = openStreams.synchronized { + val numOpen = openStreams.values.size + if (numOpen > 0) { + for (exc <- openStreams.values) { + logWarning("Leaked filesystem connection created at:") + exc.printStackTrace() + } + throw new IllegalStateException(s"There are $numOpen possibly leaked file streams.", + openStreams.values.head) + } + } +} + @SlowSQLTest -class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with SharedSparkSession { +class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with SharedSparkSession + with PrivateMethodTester { override protected def sparkConf: SparkConf = { super.sparkConf .set(SQLConf.STATE_STORE_PROVIDER_CLASS, classOf[RocksDBStateStoreProvider].getName) } + // In each test we verify opened streams are all closed + private def hadoopConf: Configuration = { + val fmClass = "org.apache.spark.sql.execution.streaming.state." + + "OpenStreamCountedTestFileManager" + val hadoopConf = new Configuration() + hadoopConf.set(STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key, fmClass) + hadoopConf + } + + override def beforeEach(): Unit = { + OpenNumCountedTestInputStream.clearOpenStreams() + } + + override def afterEach(): Unit = { + eventually(timeout(10.seconds), interval(2.seconds)) { + OpenNumCountedTestInputStream.assertNoOpenStreams() + } + } + testWithColumnFamilies( "RocksDB: check changelog and snapshot version", TestWithChangelogCheckpointingEnabled) { colFamiliesEnabled => @@ -693,7 +776,7 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared " with Changelog Checkpointing") { val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + "/state/1/1") val fileManager = new RocksDBFileManager( - dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration) + dfsRootDir.getAbsolutePath, Utils.createTempDir(), hadoopConf) val changelogWriter = fileManager.getChangeLogWriter(1) assert(changelogWriter.version === 1) @@ -742,7 +825,7 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared testWithChangelogCheckpointingEnabled("RocksDBFileManager: read and write changelog") { val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + "/state/1/1") val fileManager = new RocksDBFileManager( - dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration) + dfsRootDir.getAbsolutePath, Utils.createTempDir(), hadoopConf) val changelogWriter = fileManager.getChangeLogWriter(1) assert(changelogWriter.version === 1) @@ -761,6 +844,153 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared null, StateStore.DEFAULT_COL_FAMILY_NAME) } + changelogReader.closeIfNeeded() + + assert(entries.size == expectedEntries.size) + entries.zip(expectedEntries).map{ + case (e1, e2) => assert(e1._1 === e2._1 && e1._2 === e2._2 && e1._3 === e2._3) + } + } + + testWithChangelogCheckpointingEnabled("RocksDBFileManager: StateStoreChangelogReaderFactory " + + "edge case") { + val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + "/state/1/1") + val fileManager = new RocksDBFileManager( + dfsRootDir.getAbsolutePath, Utils.createTempDir(), hadoopConf) + + val checkpointUniqueId = Some(java.util.UUID.randomUUID.toString) + val lineage: Array[LineageItem] = Array( + LineageItem(1, java.util.UUID.randomUUID.toString), + LineageItem(2, java.util.UUID.randomUUID.toString), + LineageItem(3, java.util.UUID.randomUUID.toString) + ) + + // Create a v1 writer + val changelogWriterV1 = fileManager.getChangeLogWriter(101) + assert(changelogWriterV1.version === 1) + changelogWriterV1.commit() // v1 with empty content + + val changelogReaderV1 = fileManager.getChangelogReader(101) + assert(changelogReaderV1.version === 1) // getChangelogReader should return a v1 reader + changelogReaderV1.closeIfNeeded() + + // Create a v2 writer + val changelogWriterV2 = fileManager.getChangeLogWriter(102, useColumnFamilies = true) + assert(changelogWriterV2.version === 2) + changelogWriterV2.commit() // v2 with empty content + + val changelogReaderV2 = fileManager.getChangelogReader(102) + assert(changelogReaderV2.version === 2) // getChangelogReader should return a v2 reader + changelogReaderV2.closeIfNeeded() + + // Create a v3 writer + val changelogWriterV3 = fileManager.getChangeLogWriter( + 103, useColumnFamilies = false, checkpointUniqueId, Some(lineage)) + assert(changelogWriterV3.version === 3) + changelogWriterV3.commit() // v1 with empty content + + val changelogReaderV3 = fileManager.getChangelogReader( + 103, checkpointUniqueId = checkpointUniqueId) + assert(changelogReaderV3.version === 3) // getChangelogReader should return a v3 reader + assert(changelogReaderV3.lineage sameElements lineage) + changelogReaderV3.closeIfNeeded() + + // Create a v4 writer + val changelogWriterV4 = fileManager.getChangeLogWriter( + 104, useColumnFamilies = true, checkpointUniqueId, Some(lineage)) + assert(changelogWriterV4.version === 4) + changelogWriterV4.commit() // v1 with empty content + + val changelogReaderV4 = fileManager.getChangelogReader( + 104, checkpointUniqueId = checkpointUniqueId) + assert(changelogReaderV4.version === 4) // getChangelogReader should return a v4 reader + assert(changelogReaderV4.lineage sameElements lineage) + changelogReaderV4.closeIfNeeded() + } + + testWithChangelogCheckpointingEnabled("RocksDBFileManager: changelog reader / writer " + + "failure cases") { + val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + "/state/1/1") + val fileManager = new RocksDBFileManager( + dfsRootDir.getAbsolutePath, Utils.createTempDir(), hadoopConf) + // Failure case 1: reader writer version mismatch + // Create a v1 writer + val changelogWriterV1 = fileManager.getChangeLogWriter(101) + assert(changelogWriterV1.version === 1) + + (1 to 5).foreach(i => changelogWriterV1.put(i.toString, i.toString)) + (2 to 4).foreach(j => changelogWriterV1.delete(j.toString)) + + changelogWriterV1.commit() + // Success case, when reading from the same file, a V1 reader should be constructed. + val changelogReaderV1 = fileManager.getChangelogReader(101) + assert(changelogReaderV1.version === 1) + changelogReaderV1.closeIfNeeded() + + // Failure case, force creating a V3 reader. + val dfsChangelogFile = PrivateMethod[Path](Symbol("dfsChangelogFile")) + val codec = PrivateMethod[CompressionCodec](Symbol("codec")) + var changelogFile = fileManager invokePrivate dfsChangelogFile(101L, None) + val compressionCodec = fileManager invokePrivate codec() + val fm = CheckpointFileManager.create(new Path(dfsRootDir.getAbsolutePath), new Configuration) + val e = intercept[AssertionError] { + new StateStoreChangelogReaderV3(fm, changelogFile, compressionCodec) + } + assert(e.getMessage.contains("Changelog version mismatch")) + + changelogFile = fileManager invokePrivate dfsChangelogFile(1L, None) + // Failure case 2: readerFactory throw when reading from ckpt built in future Spark version + // Create a v101 writer + val changelogWriter = new TestStateStoreChangelogWriterV101( + fm, changelogFile, compressionCodec) + assert(changelogWriter.version === 101) + + changelogWriter.commit() + + // Failure case, force creating a V3 reader. + val ex = intercept[SparkException] { + fileManager.getChangelogReader(1) + } + checkError( + ex, + condition = "CANNOT_LOAD_STATE_STORE.INVALID_CHANGE_LOG_READER_VERSION", + parameters = Map("version" -> 101.toString) + ) + assert(ex.getMessage.contains("please upgrade your Spark")) + } + + testWithChangelogCheckpointingEnabled("RocksDBFileManager: read and write changelog " + + "with state checkpoint id enabled") { + val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + "/state/1/1") + val fileManager = new RocksDBFileManager( + dfsRootDir.getAbsolutePath, Utils.createTempDir(), hadoopConf) + val checkpointUniqueId = Some(java.util.UUID.randomUUID.toString) + val lineage: Array[LineageItem] = Array( + LineageItem(1, java.util.UUID.randomUUID.toString), + LineageItem(2, java.util.UUID.randomUUID.toString), + LineageItem(3, java.util.UUID.randomUUID.toString) + ) + val changelogWriter = fileManager.getChangeLogWriter( + 3, useColumnFamilies = false, checkpointUniqueId, Some(lineage)) + assert(changelogWriter.version === 3) + + (1 to 5).foreach(i => changelogWriter.put(i.toString, i.toString)) + (2 to 4).foreach(j => changelogWriter.delete(j.toString)) + + changelogWriter.commit() + val changelogReader = fileManager.getChangelogReader(3, checkpointUniqueId) + assert(changelogReader.version === 3) + assert(changelogReader.lineage sameElements lineage) + val entries = changelogReader.toSeq + val expectedEntries = (1 to 5).map { i => + (RecordType.PUT_RECORD, i.toString.getBytes, + i.toString.getBytes, StateStore.DEFAULT_COL_FAMILY_NAME) + } ++ (2 to 4).map { j => + (RecordType.DELETE_RECORD, j.toString.getBytes, + null, StateStore.DEFAULT_COL_FAMILY_NAME) + } + changelogReader.closeIfNeeded() + assert(entries.size == expectedEntries.size) entries.zip(expectedEntries).map{ case (e1, e2) => assert(e1._1 === e2._1 && e1._2 === e2._2 && e1._3 === e2._3) @@ -771,8 +1001,8 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared "RocksDBFileManager: read and write v2 changelog with default col family") { val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + "/state/1/1") val fileManager = new RocksDBFileManager( - dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration) - val changelogWriter = fileManager.getChangeLogWriter(1, true) + dfsRootDir.getAbsolutePath, Utils.createTempDir(), hadoopConf) + val changelogWriter = fileManager.getChangeLogWriter(1, useColumnFamilies = true) assert(changelogWriter.version === 2) (1 to 5).foreach { i => changelogWriter.put(i.toString, i.toString) @@ -786,7 +1016,7 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared } changelogWriter.commit() - val changelogReader = fileManager.getChangelogReader(1, true) + val changelogReader = fileManager.getChangelogReader(1) assert(changelogReader.version === 2) val entries = changelogReader.toSeq val expectedEntries = (1 to 5).map { i => @@ -796,6 +1026,52 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared } ++ (2 to 4).map { j => (RecordType.DELETE_RECORD, j.toString.getBytes, null) } + changelogReader.closeIfNeeded() + + assert(entries.size == expectedEntries.size) + entries.zip(expectedEntries).map{ + case (e1, e2) => assert(e1._1 === e2._1 && e1._2 === e2._2 && e1._3 === e2._3) + } + } + + testWithChangelogCheckpointingEnabled("RocksDBFileManager: read and write v2 changelog with " + + "default col family and state checkpoint id enabled") { + val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + "/state/1/1") + val fileManager = new RocksDBFileManager( + dfsRootDir.getAbsolutePath, Utils.createTempDir(), hadoopConf) + val checkpointUniqueId = Some(java.util.UUID.randomUUID.toString) + val lineage: Array[LineageItem] = Array( + LineageItem(1, java.util.UUID.randomUUID.toString), + LineageItem(2, java.util.UUID.randomUUID.toString), + LineageItem(3, java.util.UUID.randomUUID.toString) + ) + val changelogWriter = fileManager.getChangeLogWriter( + 1, useColumnFamilies = true, checkpointUniqueId, Some(lineage)) + assert(changelogWriter.version === 4) + (1 to 5).foreach { i => + changelogWriter.put(i.toString, i.toString) + } + (1 to 5).foreach { i => + changelogWriter.merge(i.toString, i.toString) + } + + (2 to 4).foreach { j => + changelogWriter.delete(j.toString) + } + + changelogWriter.commit() + val changelogReader = fileManager.getChangelogReader(1, checkpointUniqueId) + assert(changelogReader.version === 4) + assert(changelogReader.lineage sameElements lineage) + val entries = changelogReader.toSeq + val expectedEntries = (1 to 5).map { i => + (RecordType.PUT_RECORD, i.toString.getBytes, i.toString.getBytes) + } ++ (1 to 5).map { i => + (RecordType.MERGE_RECORD, i.toString.getBytes, i.toString.getBytes) + } ++ (2 to 4).map { j => + (RecordType.DELETE_RECORD, j.toString.getBytes, null) + } + changelogReader.closeIfNeeded() assert(entries.size == expectedEntries.size) entries.zip(expectedEntries).map{ @@ -810,7 +1086,7 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared try { val verificationDir = Utils.createTempDir().getAbsolutePath val fileManager = new RocksDBFileManager( - dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration) + dfsRootDir.getAbsolutePath, Utils.createTempDir(), hadoopConf) // Save a version of empty checkpoint files val cpFiles = Seq() generateFiles(verificationDir, cpFiles) @@ -910,10 +1186,10 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared // Use 2 file managers here to emulate concurrent execution // that checkpoint the same version of state val fileManager = new RocksDBFileManager( - dfsRootDir, Utils.createTempDir(), new Configuration) + dfsRootDir, Utils.createTempDir(), hadoopConf) val rocksDBFileMapping = new RocksDBFileMapping() val fileManager_ = new RocksDBFileManager( - dfsRootDir, Utils.createTempDir(), new Configuration) + dfsRootDir, Utils.createTempDir(), hadoopConf) val sstDir = s"$dfsRootDir/SSTs" def numRemoteSSTFiles: Int = listFiles(sstDir).length val logDir = s"$dfsRootDir/logs" @@ -994,7 +1270,7 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared withTempDir { dir => val dfsRootDir = dir.getAbsolutePath val fileManager = new RocksDBFileManager( - dfsRootDir, Utils.createTempDir(), new Configuration) + dfsRootDir, Utils.createTempDir(), hadoopConf) (new File(dfsRootDir, "SSTs")).mkdir() (new File(dfsRootDir, "logs")).mkdir() @@ -1053,7 +1329,7 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared val dfsRootDir = dir.getAbsolutePath val verificationDir = Utils.createTempDir().getAbsolutePath // local dir to load checkpoints val fileManager = new RocksDBFileManager( - dfsRootDir, Utils.createTempDir(), new Configuration) + dfsRootDir, Utils.createTempDir(), hadoopConf) val sstDir = s"$dfsRootDir/SSTs" def numRemoteSSTFiles: Int = listFiles(sstDir).length val logDir = s"$dfsRootDir/logs" @@ -2446,7 +2722,7 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared remoteDir: String, version: Int = 0, conf: RocksDBConf = dbConf, - hadoopConf: Configuration = new Configuration(), + hadoopConf: Configuration = hadoopConf, useColumnFamilies: Boolean = false, localDir: File = Utils.createTempDir())( func: RocksDB => T): T = {