diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala index 9ac74eb5b9e8f..74bd7a67b3b2f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala @@ -124,8 +124,8 @@ case class CkptIdCollectingStateStoreWrapper(innerStore: StateStore) extends Sta override def commit(): Long = innerStore.commit() override def metrics: StateStoreMetrics = innerStore.metrics - override def getStateStoreCheckpointInfo(): StateStoreCheckpointInfo = { - val ret = innerStore.getStateStoreCheckpointInfo() + override def getStateStoreCheckpointInfo: StateStoreCheckpointInfo = { + val ret = innerStore.getStateStoreCheckpointInfo CkptIdCollectingStateStoreWrapper.addCheckpointInfo(ret) ret } @@ -182,7 +182,7 @@ class CkptIdCollectingStateStoreProviderWrapper extends StateStoreProvider { // return their own state store checkpointID. This can happen because of task retry or // speculative execution. class RocksDBStateStoreCheckpointFormatV2Suite extends StreamTest - with AlsoTestWithChangelogCheckpointingEnabled { + with AlsoTestWithRocksDBFeatures { import testImplicits._ val providerClassName = classOf[CkptIdCollectingStateStoreProviderWrapper].getCanonicalName @@ -211,6 +211,328 @@ class RocksDBStateStoreCheckpointFormatV2Suite extends StreamTest } } + val changelogEnabled = + "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled" -> "true" + val changelogDisabled = + "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled" -> "false" + val ckptv1 = SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key -> "1" + val ckptv2 = SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key -> "2" + + val testConfigSetups = Seq( + // Enable and disable changelog under ckpt v2 + (Seq(changelogEnabled, ckptv2), Seq(changelogEnabled, ckptv2)), + (Seq(changelogDisabled, ckptv2), Seq(changelogDisabled, ckptv2)), + // Cross version cross changelog enabled/disabled + (Seq(changelogDisabled, ckptv1), Seq(changelogDisabled, ckptv2)), + (Seq(changelogEnabled, ckptv1), Seq(changelogEnabled, ckptv2)), + (Seq(changelogDisabled, ckptv1), Seq(changelogEnabled, ckptv2)), + (Seq(changelogEnabled, ckptv1), Seq(changelogDisabled, ckptv2)) + ) + + testConfigSetups.foreach { + case (firstRunConfig, secondRunConfig) => + testWithRocksDBStateStore("checkpointFormatVersion2 Backward Compatibility - simple agg - " + + s"first run: (changeLogEnabled, ckpt ver): " + + s"${firstRunConfig(0)._2}, ${firstRunConfig(1)._2}" + + s" - second run: ${secondRunConfig(0)._2}, ${secondRunConfig(1)._2}") { + withTempDir { checkpointDir => + val inputData = MemoryStream[Int] + val aggregated = + inputData + .toDF() + .groupBy($"value") + .agg(count("*")) + .as[(Int, Long)] + + withSQLConf(firstRunConfig: _*) { + testStream(aggregated, Update)( + StartStream(checkpointLocation = checkpointDir.getAbsolutePath), + AddData(inputData, 3), + CheckLastBatch((3, 1)), + AddData(inputData, 3, 2), + CheckLastBatch((3, 2), (2, 1)), + StopStream + ) + + testStream(aggregated, Update)( + StartStream(checkpointLocation = checkpointDir.getAbsolutePath), + AddData(inputData, 3, 2, 1), + CheckLastBatch((3, 3), (2, 2), (1, 1)), + // By default we run in new tuple mode. + AddData(inputData, 4, 4, 4, 4), + CheckLastBatch((4, 4)), + AddData(inputData, 5, 5), + CheckLastBatch((5, 2)) + ) + } + + withSQLConf(secondRunConfig: _*) { + testStream(aggregated, Update)( + StartStream(checkpointLocation = checkpointDir.getAbsolutePath), + AddData(inputData, 4), + CheckLastBatch((4, 5)) + ) + } + } + } + } + + testConfigSetups.foreach { + case (firstRunConfig, secondRunConfig) => + testWithRocksDBStateStore("checkpointFormatVersion2 Backward Compatibility - dedup - " + + s"first run: (changeLogEnabled, ckpt ver): " + + s"${firstRunConfig(0)._2}, ${firstRunConfig(1)._2}" + + s" - second run: ${secondRunConfig(0)._2}, ${secondRunConfig(1)._2}") { + withTempDir { checkpointDir => + val inputData = MemoryStream[Int] + val deduplicated = inputData + .toDF() + .dropDuplicates("value") + .as[Int] + + withSQLConf(firstRunConfig: _*) { + testStream(deduplicated, Update)( + StartStream(checkpointLocation = checkpointDir.getAbsolutePath), + AddData(inputData, 3), + CheckLastBatch(3), + AddData(inputData, 3, 2), + CheckLastBatch(2), + AddData(inputData, 3, 2, 1), + CheckLastBatch(1), + StopStream + ) + + // Test recovery + testStream(deduplicated, Update)( + StartStream(checkpointLocation = checkpointDir.getAbsolutePath), + AddData(inputData, 4, 1, 3), + CheckLastBatch(4), + AddData(inputData, 5, 4, 4), + CheckLastBatch(5), + StopStream + ) + } + + withSQLConf(secondRunConfig: _*) { + // crash recovery again + testStream(deduplicated, Update)( + StartStream(checkpointLocation = checkpointDir.getAbsolutePath), + AddData(inputData, 4, 7), + CheckLastBatch(7) + ) + } + } + } + } + + testConfigSetups.foreach { + case (firstRunConfig, secondRunConfig) => + testWithRocksDBStateStore("checkpointFormatVersion2 Backward Compatibility - " + + s"FlatMapGroupsWithState - first run: (changeLogEnabled, ckpt ver): " + + s"${firstRunConfig(0)._2}, ${firstRunConfig(1)._2}" + + s" - second run: ${secondRunConfig(0)._2}, ${secondRunConfig(1)._2}") { + withTempDir { checkpointDir => + val stateFunc = (key: Int, values: Iterator[Int], state: GroupState[Int]) => { + val count: Int = state.getOption.getOrElse(0) + values.size + state.update(count) + Iterator((key, count)) + } + + val inputData = MemoryStream[Int] + val aggregated = inputData + .toDF() + .toDF("key") + .selectExpr("key") + .as[Int] + .repartition($"key") + .groupByKey(x => x) + .flatMapGroupsWithState(OutputMode.Update, GroupStateTimeout.NoTimeout())(stateFunc) + + + withSQLConf(firstRunConfig: _*) { + testStream(aggregated, Update)( + StartStream(checkpointLocation = checkpointDir.getAbsolutePath), + AddData(inputData, 3), + CheckLastBatch((3, 1)), + AddData(inputData, 3, 2), + CheckLastBatch((3, 2), (2, 1)), + StopStream + ) + + // Test recovery + testStream(aggregated, Update)( + StartStream(checkpointLocation = checkpointDir.getAbsolutePath), + AddData(inputData, 4, 1, 3), + CheckLastBatch((4, 1), (1, 1), (3, 3)), + AddData(inputData, 5, 4, 4), + CheckLastBatch((5, 1), (4, 3)), + StopStream + ) + } + + withSQLConf(secondRunConfig: _*) { + // crash recovery again + // crash recovery again + testStream(aggregated, Update)( + StartStream(checkpointLocation = checkpointDir.getAbsolutePath), + AddData(inputData, 4, 7), + CheckLastBatch((4, 4), (7, 1)), + AddData (inputData, 5), + CheckLastBatch((5, 2)), + StopStream + ) + } + } + } + } + + testConfigSetups.foreach { + case (firstRunConfig, secondRunConfig) => + testWithRocksDBStateStore("checkpointFormatVersion2 Backward Compatibility - ss join - " + + s"first run: (changeLogEnabled, ckpt ver): " + + s"${firstRunConfig(0)._2}, ${firstRunConfig(1)._2}" + + s" - second run: ${secondRunConfig(0)._2}, ${secondRunConfig(1)._2}") { + withTempDir { checkpointDir => + val inputData1 = MemoryStream[Int] + val inputData2 = MemoryStream[Int] + + val df1 = inputData1.toDS().toDF("value") + val df2 = inputData2.toDS().toDF("value") + + val joined = df1.join(df2, df1("value") === df2("value")) + + withSQLConf(firstRunConfig: _*) { + testStream(joined, OutputMode.Append)( + StartStream(checkpointLocation = checkpointDir.getAbsolutePath), + AddData(inputData1, 3, 2), + AddData(inputData2, 3), + CheckLastBatch((3, 3)), + AddData(inputData2, 2), + // This data will be used after restarting the query + AddData(inputData1, 5), + CheckLastBatch((2, 2)), + StopStream + ) + + // Test recovery. + testStream(joined, OutputMode.Append)( + StartStream(checkpointLocation = checkpointDir.getAbsolutePath), + AddData(inputData1, 4), + AddData(inputData2, 5), + CheckLastBatch((5, 5)), + AddData(inputData2, 4), + // This data will be used after restarting the query + AddData(inputData1, 7), + CheckLastBatch((4, 4)), + StopStream + ) + } + + withSQLConf(secondRunConfig: _*) { + // recovery again + testStream(joined, OutputMode.Append)( + StartStream(checkpointLocation = checkpointDir.getAbsolutePath), + AddData(inputData1, 6), + AddData(inputData2, 6), + CheckLastBatch((6, 6)), + AddData(inputData2, 7), + CheckLastBatch((7, 7)), + StopStream + ) + + // recovery again + testStream(joined, OutputMode.Append)( + StartStream(checkpointLocation = checkpointDir.getAbsolutePath), + AddData(inputData1, 8), + AddData(inputData2, 8), + CheckLastBatch((8, 8)), + StopStream + ) + } + } + } + } + + + testConfigSetups.foreach { + case (firstRunConfig, secondRunConfig) => + testWithRocksDBStateStore("checkpointFormatVersion2 Backward Compatibility - " + + "transformWithState - first run: (changeLogEnabled, ckpt ver): " + + s"${firstRunConfig(0)._2}, ${firstRunConfig(1)._2}" + + s" - second run: ${secondRunConfig(0)._2}, ${secondRunConfig(1)._2}") { + withTempDir { checkpointDir => + val inputData = MemoryStream[String] + val result = inputData.toDS() + .groupByKey(x => x) + .transformWithState(new RunningCountStatefulProcessor(), + TimeMode.None(), + OutputMode.Update()) + + withSQLConf(firstRunConfig: _*) { + testStream(result, Update())( + StartStream(checkpointLocation = checkpointDir.getAbsolutePath), + AddData(inputData, "a"), + CheckNewAnswer(("a", "1")), + Execute { q => + assert(q.lastProgress.stateOperators(0) + .customMetrics.get("numValueStateVars") > 0) + assert(q.lastProgress.stateOperators(0) + .customMetrics.get("numRegisteredTimers") == 0) + }, + AddData(inputData, "a", "b"), + CheckNewAnswer(("a", "2"), ("b", "1")), + StopStream + ) + testStream(result, Update())( + StartStream(checkpointLocation = checkpointDir.getAbsolutePath), + // should remove state for "a" and not return anything for a + AddData(inputData, "a", "b"), + CheckNewAnswer(("b", "2")), + StopStream + ) + } + + withSQLConf(secondRunConfig: _*) { + testStream(result, Update())( + StartStream(checkpointLocation = checkpointDir.getAbsolutePath), + // should recreate state for "a" and return count as 1 and + AddData(inputData, "a", "c"), + CheckNewAnswer(("a", "1"), ("c", "1")), + StopStream + ) + } + } + } + } + + test("checkpointFormatVersion2 validate ") { + val inputData = MemoryStream[String] + val result = inputData.toDS() + .groupByKey(x => x) + .transformWithState(new RunningCountStatefulProcessor(), + TimeMode.None(), + OutputMode.Update()) + + testStream(result, Update())( + AddData(inputData, "a"), + CheckNewAnswer(("a", "1")), + Execute { q => + assert(q.lastProgress.stateOperators(0).customMetrics.get("numValueStateVars") > 0) + assert(q.lastProgress.stateOperators(0).customMetrics.get("numRegisteredTimers") == 0) + }, + AddData(inputData, "a", "b"), + CheckNewAnswer(("a", "2"), ("b", "1")), + StopStream, + StartStream(), + AddData(inputData, "a", "b"), // should remove state for "a" and not return anything for a + CheckNewAnswer(("b", "2")), + StopStream, + StartStream(), + AddData(inputData, "a", "c"), // should recreate state for "a" and return count as 1 and + CheckNewAnswer(("a", "1"), ("c", "1")) + ) + } + // This test enable checkpoint format V2 without validating the checkpoint ID. Just to make // sure it doesn't break and return the correct query results. testWithChangelogCheckpointingEnabled(s"checkpointFormatVersion2") { @@ -437,6 +759,15 @@ class RocksDBStateStoreCheckpointFormatV2Suite extends StreamTest CheckLastBatch((7, 7)), StopStream ) + + // recovery again + testStream(joined, OutputMode.Append)( + StartStream(checkpointLocation = checkpointDir.getAbsolutePath), + AddData(inputData1, 8), + AddData(inputData2, 8), + CheckLastBatch((8, 8)), + StopStream + ) } val checkpointInfoList = CkptIdCollectingStateStoreWrapper.getStateStoreCheckpointInfos // We sometimes add data to both data sources before CheckLastBatch(). They could be picked @@ -445,11 +776,11 @@ class RocksDBStateStoreCheckpointFormatV2Suite extends StreamTest val numBatches = checkpointInfoList.size / 8 // We don't pass batch versions that would need base checkpoint IDs because we don't know - // batchIDs for that. We only know that there are 3 batches without it. + // batchIDs for that. We only know that there are 1 batches without it. validateCheckpointInfo(numBatches, 4, Set()) assert(CkptIdCollectingStateStoreWrapper .getStateStoreCheckpointInfos - .count(_.baseStateStoreCkptId.isDefined) == (numBatches - 3) * 8) + .count(_.baseStateStoreCkptId.isDefined) == (numBatches - 1) * 8) } testWithCheckpointInfoTracked(s"checkpointFormatVersion2 validate DropDuplicates") { @@ -541,4 +872,35 @@ class RocksDBStateStoreCheckpointFormatV2Suite extends StreamTest } validateCheckpointInfo(6, 1, Set(2, 4, 6)) } + + test("checkpointFormatVersion2 validate transformWithState") { + withTempDir { checkpointDir => + val inputData = MemoryStream[String] + val result = inputData.toDS() + .groupByKey(x => x) + .transformWithState(new RunningCountStatefulProcessor(), + TimeMode.None(), + OutputMode.Update()) + + testStream(result, Update())( + StartStream(checkpointLocation = checkpointDir.getAbsolutePath), + AddData(inputData, "a"), + CheckNewAnswer(("a", "1")), + Execute { q => + assert(q.lastProgress.stateOperators(0).customMetrics.get("numValueStateVars") > 0) + assert(q.lastProgress.stateOperators(0).customMetrics.get("numRegisteredTimers") == 0) + }, + AddData(inputData, "a", "b"), + CheckNewAnswer(("a", "2"), ("b", "1")), + StopStream, + StartStream(checkpointLocation = checkpointDir.getAbsolutePath), + AddData(inputData, "a", "b"), // should remove state for "a" and not return anything for a + CheckNewAnswer(("b", "2")), + StopStream, + StartStream(checkpointLocation = checkpointDir.getAbsolutePath), + AddData(inputData, "a", "c"), // should recreate state for "a" and return count as 1 and + CheckNewAnswer(("a", "1"), ("c", "1")) + ) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 8fde216c14411..5549b5f72ea90 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -790,6 +790,60 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared } } + testWithChangelogCheckpointingEnabled( + "RocksDBFileManager: read and write changelog with V2 format") { + val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + "/state/1/1") + val fileManager = new RocksDBFileManager( + dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration) + val uuid = UUID.randomUUID().toString + val changelogWriter = fileManager.getChangeLogWriter(1, checkpointUniqueId = Some(uuid)) + changelogWriter.writeLineage(Array((1, uuid))) + assert(changelogWriter.version === 1) + + (1 to 5).foreach(i => changelogWriter.put(i.toString, i.toString)) + (2 to 4).foreach(j => changelogWriter.delete(j.toString)) + + changelogWriter.commit() + val changelogReader = fileManager.getChangelogReader(1, checkpointUniqueId = Some(uuid)) + assert(changelogReader.version === 1) + val entries = changelogReader.toSeq + val expectedEntries = (1 to 5).map { i => + (RecordType.PUT_RECORD, i.toString.getBytes, + i.toString.getBytes, StateStore.DEFAULT_COL_FAMILY_NAME) + } ++ (2 to 4).map { j => + (RecordType.DELETE_RECORD, j.toString.getBytes, + null, StateStore.DEFAULT_COL_FAMILY_NAME) + } + + assert(entries.size == expectedEntries.size) + entries.zip(expectedEntries).map{ + case (e1, e2) => assert(e1._1 === e2._1 && e1._2 === e2._2 && e1._3 === e2._3) + } + } + + testWithChangelogCheckpointingEnabled( + "RocksDBFileManager: changelog reader / writer with V2 format should not be able to" + + " load a V1 changelog") { + val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + "/state/1/1") + val fileManager = new RocksDBFileManager( + dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration) + // First write with V2 + val changelogWriterV1 = fileManager.getChangeLogWriter(1) + assert(changelogWriterV1.version === 1) + + (1 to 5).foreach(i => changelogWriterV1.put(i.toString, i.toString)) + (2 to 4).foreach(j => changelogWriterV1.delete(j.toString)) + + changelogWriterV1.commit() + val fileManagerV2 = new RocksDBFileManager( + dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration) + val uuid = UUID.randomUUID().toString + val e = intercept[SparkException] { + fileManagerV2.getChangelogReader(1, checkpointUniqueId = Some(uuid)) + } + assert(e.getErrorClass == "CANNOT_LOAD_STATE_STORE.CANNOT_READ_STREAMING_STATE_FILE") + } + testWithColumnFamilies("RocksDBFileManager: create init dfs directory with " + s"unknown number of keys", TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => @@ -890,6 +944,187 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared } } + testWithChangelogCheckpointingEnabled("RocksDB Fault Tolerance: correctly handle when there " + + "are multiple snapshot files for the same version") { + val enableStateStoreCheckpointIds = true + val useColumnFamily = true + val remoteDir = Utils.createTempDir().toString + new File(remoteDir).delete() // to make sure that the directory gets created + val enableChangelogCheckpointingConf = + dbConf.copy(enableChangelogCheckpointing = true, minVersionsToRetain = 20, + minDeltasForSnapshot = 3) + + // Simulate when there are multiple snapshot files for the same version + // The first DB writes to version 0 with uniqueId + val versionToUniqueId1 = new mutable.HashMap[Long, String]() + withDB(remoteDir, conf = enableChangelogCheckpointingConf, + useColumnFamilies = useColumnFamily, + enableStateStoreCheckpointIds = enableStateStoreCheckpointIds, + versionToUniqueId = versionToUniqueId1) { db => + db.load(0) + db.put("a", "1") + db.commit() + + // Add some change log files after the snapshot + for (version <- 2 to 5) { + db.load(version - 1) + db.put(version.toString, version.toString) // update "1" -> "1", "2" -> "2", ... + db.commit() + } + + // doMaintenance uploads the snapshot + db.doMaintenance() + + for (version <- 6 to 10) { + db.load(version - 1) + db.put(version.toString, version.toString) + db.commit() + } + } + + // The second DB writes to version 0 with another uniqueId + val versionToUniqueId2 = new mutable.HashMap[Long, String]() + withDB(remoteDir, conf = enableChangelogCheckpointingConf, + useColumnFamilies = useColumnFamily, + enableStateStoreCheckpointIds = enableStateStoreCheckpointIds, + versionToUniqueId = versionToUniqueId2) { db => + db.load(0) + db.put("b", "2") + db.commit() + // Add some change log files after the snapshot + for (version <- 2 to 5) { + db.load(version - 1) + db.put(version.toString, (version + 1).toString) // update "1" -> "1", "2" -> "2", ... + db.commit() + } + + // doMaintenance uploads the snapshot + db.doMaintenance() + + for (version <- 6 to 10) { + db.load(version - 1) + db.put(version.toString, (version + 1).toString) + db.commit() + } + } + + // During a load() with linage from the first rocksDB, + // the DB should load with data in the first db + withDB(remoteDir, conf = enableChangelogCheckpointingConf, + useColumnFamilies = useColumnFamily, + enableStateStoreCheckpointIds = enableStateStoreCheckpointIds, + versionToUniqueId = versionToUniqueId1) { db => + db.load(10) + assert(toStr(db.get("a")) === "1") + for (version <- 2 to 10) { + // "1" -> "1", "2" -> "2", ... + assert(toStr(db.get(version.toString)) === version.toString) + } + } + } + + testWithChangelogCheckpointingEnabled("RocksDB Fault Tolerance: correctly handle when loading " + + "from version v with v.changelog in checkpoint format v2") { + var enableStateStoreCheckpointIds = false + val useColumnFamily = true + val remoteDir = Utils.createTempDir().toString + new File(remoteDir).delete() // to make sure that the directory gets created + val enableChangelogCheckpointingConf = + dbConf.copy(enableChangelogCheckpointing = true, minVersionsToRetain = 20, + minDeltasForSnapshot = 3) + + // The first DB has enableStateStoreCheckpointIds = false + withDB(remoteDir, conf = enableChangelogCheckpointingConf, + useColumnFamilies = useColumnFamily, + enableStateStoreCheckpointIds = enableStateStoreCheckpointIds) { db => + db.load(0) + db.put("a", "1") + db.commit() + + // Add some change log files after the snapshot + for (version <- 2 to 5) { + db.load(version - 1) + db.put(version.toString, version.toString) // update "1" -> "1", "2" -> "2", ... + db.commit() + } + + // doMaintenance uploads the snapshot + db.doMaintenance() + + for (version <- 6 to 10) { + db.load(version - 1) + db.put(version.toString, version.toString) + db.commit() + } + } + + // The second DB writes to version 0 with another uniqueId + val versionToUniqueId = new mutable.HashMap[Long, String]() + enableStateStoreCheckpointIds = true + + // During a load() with linage from the first rocksDB, + // the DB should load with data in the first db + withDB(remoteDir, conf = enableChangelogCheckpointingConf, + useColumnFamilies = useColumnFamily, + enableStateStoreCheckpointIds = enableStateStoreCheckpointIds, + versionToUniqueId = versionToUniqueId) { db => + db.load(10, None) // When reloading, the first checkpointUniqueId is None + assert(toStr(db.get("a")) === "1") + for (version <- 2 to 10) { + // "1" -> "1", "2" -> "2", ... + assert(toStr(db.get(version.toString)) === version.toString) + } + } + } + + testWithChangelogCheckpointingEnabled("RocksDB Fault Tolerance: correctly handle when loading " + + "from version v with v.zip in checkpoint format v2") { + var enableStateStoreCheckpointIds = false + val useColumnFamily = true + val remoteDir = Utils.createTempDir().toString + new File(remoteDir).delete() // to make sure that the directory gets created + val enableChangelogCheckpointingConf = + dbConf.copy(enableChangelogCheckpointing = true, minVersionsToRetain = 20, + minDeltasForSnapshot = 3) + + // The first DB has enableStateStoreCheckpointIds = false + withDB(remoteDir, conf = enableChangelogCheckpointingConf, + useColumnFamilies = useColumnFamily, + enableStateStoreCheckpointIds = enableStateStoreCheckpointIds) { db => + db.load(0) + db.put("a", "1") + db.commit() + + // Add some change log files after the snapshot + for (version <- 2 to 10) { + db.load(version - 1) + db.put(version.toString, version.toString) // update "1" -> "1", "2" -> "2", ... + db.commit() + } + + // doMaintenance uploads the snapshot 10.zip + db.doMaintenance() + } + + // The second DB writes to version 0 with another uniqueId + val versionToUniqueId = new mutable.HashMap[Long, String]() + enableStateStoreCheckpointIds = true + + // During a load() with linage from the first rocksDB, + // the DB should load with data in the first db + withDB(remoteDir, conf = enableChangelogCheckpointingConf, + useColumnFamilies = useColumnFamily, + enableStateStoreCheckpointIds = enableStateStoreCheckpointIds, + versionToUniqueId = versionToUniqueId) { db => + db.load(10, None) + assert(toStr(db.get("a")) === "1") + for (version <- 2 to 10) { + // "1" -> "1", "2" -> "2", ... + assert(toStr(db.get(version.toString)) === version.toString) + } + } + } + testWithColumnFamilies("RocksDBFileManager: delete orphan files", TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => withTempDir { dir =>