Skip to content

Commit

Permalink
continue merge master
Browse files Browse the repository at this point in the history
  • Loading branch information
WweiL committed Oct 18, 2024
1 parent ca2e5d7 commit c6b8cb6
Showing 1 changed file with 1 addition and 236 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -926,60 +926,6 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession {
}
}

testWithChangelogCheckpointingEnabled(
"RocksDBFileManager: read and write changelog with V2 format") {
val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + "/state/1/1")
val fileManager = new RocksDBFileManager(
dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration)
val uuid = UUID.randomUUID().toString
val changelogWriter = fileManager.getChangeLogWriter(1, checkpointUniqueId = Some(uuid))
changelogWriter.writeLineage(Array((1, uuid)))
assert(changelogWriter.version === 1)

(1 to 5).foreach(i => changelogWriter.put(i.toString, i.toString))
(2 to 4).foreach(j => changelogWriter.delete(j.toString))

changelogWriter.commit()
val changelogReader = fileManager.getChangelogReader(1, checkpointUniqueId = Some(uuid))
assert(changelogReader.version === 1)
val entries = changelogReader.toSeq
val expectedEntries = (1 to 5).map { i =>
(RecordType.PUT_RECORD, i.toString.getBytes,
i.toString.getBytes, StateStore.DEFAULT_COL_FAMILY_NAME)
} ++ (2 to 4).map { j =>
(RecordType.DELETE_RECORD, j.toString.getBytes,
null, StateStore.DEFAULT_COL_FAMILY_NAME)
}

assert(entries.size == expectedEntries.size)
entries.zip(expectedEntries).map{
case (e1, e2) => assert(e1._1 === e2._1 && e1._2 === e2._2 && e1._3 === e2._3)
}
}

testWithChangelogCheckpointingEnabled(
"RocksDBFileManager: changelog reader / writer with V2 format should not be able to" +
" load a V1 changelog") {
val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + "/state/1/1")
val fileManager = new RocksDBFileManager(
dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration)
// First write with V2
val changelogWriterV1 = fileManager.getChangeLogWriter(1)
assert(changelogWriterV1.version === 1)

(1 to 5).foreach(i => changelogWriterV1.put(i.toString, i.toString))
(2 to 4).foreach(j => changelogWriterV1.delete(j.toString))

changelogWriterV1.commit()
val fileManagerV2 = new RocksDBFileManager(
dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration)
val uuid = UUID.randomUUID().toString
val e = intercept[SparkException] {
fileManagerV2.getChangelogReader(1, checkpointUniqueId = Some(uuid))
}
assert(e.getErrorClass == "CANNOT_LOAD_STATE_STORE.CANNOT_READ_STREAMING_STATE_FILE")
}

testWithColumnFamilies("RocksDBFileManager: create init dfs directory with " +
s"unknown number of keys",
TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
Expand Down Expand Up @@ -1254,188 +1200,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession {
assert(numRemoteLogFiles == 2)
}
}

testWithChangelogCheckpointingEnabled("RocksDB Fault Tolerance: correctly handle when there " +
"are multiple snapshot files for the same version") {
val enableStateStoreCheckpointIds = true
val useColumnFamily = true
val remoteDir = Utils.createTempDir().toString
new File(remoteDir).delete() // to make sure that the directory gets created
val enableChangelogCheckpointingConf =
dbConf.copy(enableChangelogCheckpointing = true, minVersionsToRetain = 20,
minDeltasForSnapshot = 3)

// Simulate when there are multiple snapshot files for the same version
// The first DB writes to version 0 with uniqueId
val versionToUniqueId1 = new mutable.HashMap[Long, String]()
withDB(remoteDir, conf = enableChangelogCheckpointingConf,
useColumnFamilies = useColumnFamily,
enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
versionToUniqueId = versionToUniqueId1) { db =>
db.load(0)
db.put("a", "1")
db.commit()

// Add some change log files after the snapshot
for (version <- 2 to 5) {
db.load(version - 1)
db.put(version.toString, version.toString) // update "1" -> "1", "2" -> "2", ...
db.commit()
}

// doMaintenance uploads the snapshot
db.doMaintenance()

for (version <- 6 to 10) {
db.load(version - 1)
db.put(version.toString, version.toString)
db.commit()
}
}

// The second DB writes to version 0 with another uniqueId
val versionToUniqueId2 = new mutable.HashMap[Long, String]()
withDB(remoteDir, conf = enableChangelogCheckpointingConf,
useColumnFamilies = useColumnFamily,
enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
versionToUniqueId = versionToUniqueId2) { db =>
db.load(0)
db.put("b", "2")
db.commit()
// Add some change log files after the snapshot
for (version <- 2 to 5) {
db.load(version - 1)
db.put(version.toString, (version + 1).toString) // update "1" -> "1", "2" -> "2", ...
db.commit()
}

// doMaintenance uploads the snapshot
db.doMaintenance()

for (version <- 6 to 10) {
db.load(version - 1)
db.put(version.toString, (version + 1).toString)
db.commit()
}
}

// During a load() with linage from the first rocksDB,
// the DB should load with data in the first db
withDB(remoteDir, conf = enableChangelogCheckpointingConf,
useColumnFamilies = useColumnFamily,
enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
versionToUniqueId = versionToUniqueId1) { db =>
db.load(10)
assert(toStr(db.get("a")) === "1")
for (version <- 2 to 10) {
// "1" -> "1", "2" -> "2", ...
assert(toStr(db.get(version.toString)) === version.toString)
}
}
}

testWithChangelogCheckpointingEnabled("RocksDB Fault Tolerance: correctly handle when loading " +
"from version v with v.changelog in checkpoint format v2") {
var enableStateStoreCheckpointIds = false
val useColumnFamily = true
val remoteDir = Utils.createTempDir().toString
new File(remoteDir).delete() // to make sure that the directory gets created
val enableChangelogCheckpointingConf =
dbConf.copy(enableChangelogCheckpointing = true, minVersionsToRetain = 20,
minDeltasForSnapshot = 3)

// The first DB has enableStateStoreCheckpointIds = false
withDB(remoteDir, conf = enableChangelogCheckpointingConf,
useColumnFamilies = useColumnFamily,
enableStateStoreCheckpointIds = enableStateStoreCheckpointIds) { db =>
db.load(0)
db.put("a", "1")
db.commit()

// Add some change log files after the snapshot
for (version <- 2 to 5) {
db.load(version - 1)
db.put(version.toString, version.toString) // update "1" -> "1", "2" -> "2", ...
db.commit()
}

// doMaintenance uploads the snapshot
db.doMaintenance()

for (version <- 6 to 10) {
db.load(version - 1)
db.put(version.toString, version.toString)
db.commit()
}
}

// The second DB writes to version 0 with another uniqueId
val versionToUniqueId = new mutable.HashMap[Long, String]()
enableStateStoreCheckpointIds = true

// During a load() with linage from the first rocksDB,
// the DB should load with data in the first db
withDB(remoteDir, conf = enableChangelogCheckpointingConf,
useColumnFamilies = useColumnFamily,
enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
versionToUniqueId = versionToUniqueId) { db =>
db.load(10, None) // When reloading, the first checkpointUniqueId is None
assert(toStr(db.get("a")) === "1")
for (version <- 2 to 10) {
// "1" -> "1", "2" -> "2", ...
assert(toStr(db.get(version.toString)) === version.toString)
}
}
}

testWithChangelogCheckpointingEnabled("RocksDB Fault Tolerance: correctly handle when loading " +
"from version v with v.zip in checkpoint format v2") {
var enableStateStoreCheckpointIds = false
val useColumnFamily = true
val remoteDir = Utils.createTempDir().toString
new File(remoteDir).delete() // to make sure that the directory gets created
val enableChangelogCheckpointingConf =
dbConf.copy(enableChangelogCheckpointing = true, minVersionsToRetain = 20,
minDeltasForSnapshot = 3)

// The first DB has enableStateStoreCheckpointIds = false
withDB(remoteDir, conf = enableChangelogCheckpointingConf,
useColumnFamilies = useColumnFamily,
enableStateStoreCheckpointIds = enableStateStoreCheckpointIds) { db =>
db.load(0)
db.put("a", "1")
db.commit()

// Add some change log files after the snapshot
for (version <- 2 to 10) {
db.load(version - 1)
db.put(version.toString, version.toString) // update "1" -> "1", "2" -> "2", ...
db.commit()
}

// doMaintenance uploads the snapshot 10.zip
db.doMaintenance()
}

// The second DB writes to version 0 with another uniqueId
val versionToUniqueId = new mutable.HashMap[Long, String]()
enableStateStoreCheckpointIds = true

// During a load() with linage from the first rocksDB,
// the DB should load with data in the first db
withDB(remoteDir, conf = enableChangelogCheckpointingConf,
useColumnFamilies = useColumnFamily,
enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
versionToUniqueId = versionToUniqueId) { db =>
db.load(10, None)
assert(toStr(db.get("a")) === "1")
for (version <- 2 to 10) {
// "1" -> "1", "2" -> "2", ...
assert(toStr(db.get(version.toString)) === version.toString)
}
}
}


testWithStateStoreCheckpointIdsAndColumnFamilies("RocksDBFileManager: don't delete " +
s"orphan files when there is only 1 version",
TestWithBothChangelogCheckpointingEnabledAndDisabled) {
Expand Down

0 comments on commit c6b8cb6

Please sign in to comment.