From c6b8cb63a7e5cc45b0177a00d14c1a95153fe68c Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Fri, 18 Oct 2024 14:06:17 -0400 Subject: [PATCH] continue merge master --- .../streaming/state/RocksDBSuite.scala | 237 +----------------- 1 file changed, 1 insertion(+), 236 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 824877d4bdbc0..1bbb68b3f2cab 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -926,60 +926,6 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession { } } - testWithChangelogCheckpointingEnabled( - "RocksDBFileManager: read and write changelog with V2 format") { - val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + "/state/1/1") - val fileManager = new RocksDBFileManager( - dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration) - val uuid = UUID.randomUUID().toString - val changelogWriter = fileManager.getChangeLogWriter(1, checkpointUniqueId = Some(uuid)) - changelogWriter.writeLineage(Array((1, uuid))) - assert(changelogWriter.version === 1) - - (1 to 5).foreach(i => changelogWriter.put(i.toString, i.toString)) - (2 to 4).foreach(j => changelogWriter.delete(j.toString)) - - changelogWriter.commit() - val changelogReader = fileManager.getChangelogReader(1, checkpointUniqueId = Some(uuid)) - assert(changelogReader.version === 1) - val entries = changelogReader.toSeq - val expectedEntries = (1 to 5).map { i => - (RecordType.PUT_RECORD, i.toString.getBytes, - i.toString.getBytes, StateStore.DEFAULT_COL_FAMILY_NAME) - } ++ (2 to 4).map { j => - (RecordType.DELETE_RECORD, j.toString.getBytes, - null, StateStore.DEFAULT_COL_FAMILY_NAME) - } - - assert(entries.size == expectedEntries.size) - entries.zip(expectedEntries).map{ - case (e1, e2) => assert(e1._1 === e2._1 && e1._2 === e2._2 && e1._3 === e2._3) - } - } - - testWithChangelogCheckpointingEnabled( - "RocksDBFileManager: changelog reader / writer with V2 format should not be able to" + - " load a V1 changelog") { - val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + "/state/1/1") - val fileManager = new RocksDBFileManager( - dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration) - // First write with V2 - val changelogWriterV1 = fileManager.getChangeLogWriter(1) - assert(changelogWriterV1.version === 1) - - (1 to 5).foreach(i => changelogWriterV1.put(i.toString, i.toString)) - (2 to 4).foreach(j => changelogWriterV1.delete(j.toString)) - - changelogWriterV1.commit() - val fileManagerV2 = new RocksDBFileManager( - dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration) - val uuid = UUID.randomUUID().toString - val e = intercept[SparkException] { - fileManagerV2.getChangelogReader(1, checkpointUniqueId = Some(uuid)) - } - assert(e.getErrorClass == "CANNOT_LOAD_STATE_STORE.CANNOT_READ_STREAMING_STATE_FILE") - } - testWithColumnFamilies("RocksDBFileManager: create init dfs directory with " + s"unknown number of keys", TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => @@ -1254,188 +1200,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession { assert(numRemoteLogFiles == 2) } } - - testWithChangelogCheckpointingEnabled("RocksDB Fault Tolerance: correctly handle when there " + - "are multiple snapshot files for the same version") { - val enableStateStoreCheckpointIds = true - val useColumnFamily = true - val remoteDir = Utils.createTempDir().toString - new File(remoteDir).delete() // to make sure that the directory gets created - val enableChangelogCheckpointingConf = - dbConf.copy(enableChangelogCheckpointing = true, minVersionsToRetain = 20, - minDeltasForSnapshot = 3) - - // Simulate when there are multiple snapshot files for the same version - // The first DB writes to version 0 with uniqueId - val versionToUniqueId1 = new mutable.HashMap[Long, String]() - withDB(remoteDir, conf = enableChangelogCheckpointingConf, - useColumnFamilies = useColumnFamily, - enableStateStoreCheckpointIds = enableStateStoreCheckpointIds, - versionToUniqueId = versionToUniqueId1) { db => - db.load(0) - db.put("a", "1") - db.commit() - - // Add some change log files after the snapshot - for (version <- 2 to 5) { - db.load(version - 1) - db.put(version.toString, version.toString) // update "1" -> "1", "2" -> "2", ... - db.commit() - } - - // doMaintenance uploads the snapshot - db.doMaintenance() - - for (version <- 6 to 10) { - db.load(version - 1) - db.put(version.toString, version.toString) - db.commit() - } - } - - // The second DB writes to version 0 with another uniqueId - val versionToUniqueId2 = new mutable.HashMap[Long, String]() - withDB(remoteDir, conf = enableChangelogCheckpointingConf, - useColumnFamilies = useColumnFamily, - enableStateStoreCheckpointIds = enableStateStoreCheckpointIds, - versionToUniqueId = versionToUniqueId2) { db => - db.load(0) - db.put("b", "2") - db.commit() - // Add some change log files after the snapshot - for (version <- 2 to 5) { - db.load(version - 1) - db.put(version.toString, (version + 1).toString) // update "1" -> "1", "2" -> "2", ... - db.commit() - } - - // doMaintenance uploads the snapshot - db.doMaintenance() - - for (version <- 6 to 10) { - db.load(version - 1) - db.put(version.toString, (version + 1).toString) - db.commit() - } - } - - // During a load() with linage from the first rocksDB, - // the DB should load with data in the first db - withDB(remoteDir, conf = enableChangelogCheckpointingConf, - useColumnFamilies = useColumnFamily, - enableStateStoreCheckpointIds = enableStateStoreCheckpointIds, - versionToUniqueId = versionToUniqueId1) { db => - db.load(10) - assert(toStr(db.get("a")) === "1") - for (version <- 2 to 10) { - // "1" -> "1", "2" -> "2", ... - assert(toStr(db.get(version.toString)) === version.toString) - } - } - } - - testWithChangelogCheckpointingEnabled("RocksDB Fault Tolerance: correctly handle when loading " + - "from version v with v.changelog in checkpoint format v2") { - var enableStateStoreCheckpointIds = false - val useColumnFamily = true - val remoteDir = Utils.createTempDir().toString - new File(remoteDir).delete() // to make sure that the directory gets created - val enableChangelogCheckpointingConf = - dbConf.copy(enableChangelogCheckpointing = true, minVersionsToRetain = 20, - minDeltasForSnapshot = 3) - - // The first DB has enableStateStoreCheckpointIds = false - withDB(remoteDir, conf = enableChangelogCheckpointingConf, - useColumnFamilies = useColumnFamily, - enableStateStoreCheckpointIds = enableStateStoreCheckpointIds) { db => - db.load(0) - db.put("a", "1") - db.commit() - - // Add some change log files after the snapshot - for (version <- 2 to 5) { - db.load(version - 1) - db.put(version.toString, version.toString) // update "1" -> "1", "2" -> "2", ... - db.commit() - } - - // doMaintenance uploads the snapshot - db.doMaintenance() - - for (version <- 6 to 10) { - db.load(version - 1) - db.put(version.toString, version.toString) - db.commit() - } - } - - // The second DB writes to version 0 with another uniqueId - val versionToUniqueId = new mutable.HashMap[Long, String]() - enableStateStoreCheckpointIds = true - - // During a load() with linage from the first rocksDB, - // the DB should load with data in the first db - withDB(remoteDir, conf = enableChangelogCheckpointingConf, - useColumnFamilies = useColumnFamily, - enableStateStoreCheckpointIds = enableStateStoreCheckpointIds, - versionToUniqueId = versionToUniqueId) { db => - db.load(10, None) // When reloading, the first checkpointUniqueId is None - assert(toStr(db.get("a")) === "1") - for (version <- 2 to 10) { - // "1" -> "1", "2" -> "2", ... - assert(toStr(db.get(version.toString)) === version.toString) - } - } - } - - testWithChangelogCheckpointingEnabled("RocksDB Fault Tolerance: correctly handle when loading " + - "from version v with v.zip in checkpoint format v2") { - var enableStateStoreCheckpointIds = false - val useColumnFamily = true - val remoteDir = Utils.createTempDir().toString - new File(remoteDir).delete() // to make sure that the directory gets created - val enableChangelogCheckpointingConf = - dbConf.copy(enableChangelogCheckpointing = true, minVersionsToRetain = 20, - minDeltasForSnapshot = 3) - - // The first DB has enableStateStoreCheckpointIds = false - withDB(remoteDir, conf = enableChangelogCheckpointingConf, - useColumnFamilies = useColumnFamily, - enableStateStoreCheckpointIds = enableStateStoreCheckpointIds) { db => - db.load(0) - db.put("a", "1") - db.commit() - - // Add some change log files after the snapshot - for (version <- 2 to 10) { - db.load(version - 1) - db.put(version.toString, version.toString) // update "1" -> "1", "2" -> "2", ... - db.commit() - } - - // doMaintenance uploads the snapshot 10.zip - db.doMaintenance() - } - - // The second DB writes to version 0 with another uniqueId - val versionToUniqueId = new mutable.HashMap[Long, String]() - enableStateStoreCheckpointIds = true - - // During a load() with linage from the first rocksDB, - // the DB should load with data in the first db - withDB(remoteDir, conf = enableChangelogCheckpointingConf, - useColumnFamilies = useColumnFamily, - enableStateStoreCheckpointIds = enableStateStoreCheckpointIds, - versionToUniqueId = versionToUniqueId) { db => - db.load(10, None) - assert(toStr(db.get("a")) === "1") - for (version <- 2 to 10) { - // "1" -> "1", "2" -> "2", ... - assert(toStr(db.get(version.toString)) === version.toString) - } - } - } - + testWithStateStoreCheckpointIdsAndColumnFamilies("RocksDBFileManager: don't delete " + s"orphan files when there is only 1 version", TestWithBothChangelogCheckpointingEnabledAndDisabled) {