Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-49883][SS] State Store Checkpoint Structure V2 Integration with RocksDB and RocksDBFileManager #48355

Closed
wants to merge 60 commits into from

Conversation

WweiL
Copy link
Contributor

@WweiL WweiL commented Oct 4, 2024

What changes were proposed in this pull request?

This PR enables RocksDB to read <zip, changelog> file watermarked with unique ids (e.g. version_uniqueId.zip, version_uniqueId.changelog). Below is a explanation on the changes and rationale.

Now for each changelog file, we put a "version: uniqueId" to its first line, from it's current version to the previous snapshot version. For each snapshot (zip) file, there is no change other than their name (version_uniqueId.zip), because snapshot files are single source of truth.

In addition to LastCommitBasedCheckpointId, lastCommittedCheckpointId, loadedCheckpointId added in #47895 (review), also add a in-memory instance lineagfeManager that contains mapping information version to unique Id. This is useful when you reuse the same rocksDB instance in the executor, so you don't need to load the lineage from the changelog file again.

RocksDB:

Load

  • When loadedVersion != version, try to load a changelog file with version_checkpointUniqueId.changelog. There could be multiple cases:

    1. Version corresponds to a zip file:
      1. version_uniqueId.zip, this means (either changelog was not enabled, or it was enabled but version happens to be a checkpoint file), and previously query run ckpt v2
      2. version.zip, this means (either changelog was not enabled, or it was enabled but version happens to be a checkpoint file), and previously query run ckpt v1
    2. Version corresponds to a changelog file:
      1. version_uniqueId.changelog, this means changelog was enabled, and previously query run ckpt v2
      2. version.changelog, this means changelog was enabled, and previously query run ckpt v1
  • For case i.a, we construct a new empty lineage (version, sessionCheckpointId). version_uniqueId.changelog.

  • For case ii.a, we read the lineage file stored in

  • For case i.b and ii.b, there is no need to load the lineage as they were not presented before, we just load the corresponding file without uniqueId, but newer files will be constructed with uniqueId.

checkpoint version v1 to checkpoint version v2.

Next the code finds all snapshot version through file listing. When there are multiple snapshot files with the same version but different unique Id (main problem this project was trying to solve), the correct one will be loaded based on the checkpoint id.

Then changelog is replayed with the awareness of lineage. The lineage is stored in memory for next load().

Last, load the changelog writer for version + 1, and write the lineage (version + 1, sessionCheckpointId) to the first line of the file. While it seems that the lineage is written early, it is safe because the change log writer is not committed yet.

  • When loadedVersion == version, the same rocks db instance is reused and the lineage is stored in memory to versionToUniqueIdLineage.

Commit

  • Also save sessionCheckpointId to latestSnapshot
  • Add (newVersion, sessionCheckpointId) to versionToUniqueIdLineage

Abort

Also clear up versionToUniqueIdLineage

RocksDBFileManager:

  • A bunch of add-ups to make until code uniqueId aware. Now all places that return version returns a <version, Option[uniqueId]> pair, in v1 format, the option is None.

deleteOldVersions:

If there are multiple version_uniqueId1.zip(changelog) and versioion.uniqueId2.zip, all are deleted.

Changelog Reader / Writer

We purpose to save the lineage to the first line of the changelog files.

For changelog reader, there is an abstract function readLineage created. In RocksDBCheckpointManager.getChangelogReader function, the readLineage will be called right after the initialization of the changelog reader to update the file pointer to after the lineage. Subsequent getNext function won't be affecter because of this.

For changelog writer, there is an abstract function writeLineage that writes the lineage. This function will be called before any actual changelog data is written in RocksDB.load().

Why are the changes needed?

Improve fault tolerance to RocksDB State Store.

Does this PR introduce any user-facing change?

Not yet, after the V2 format project is finished, customer can use the new config to enable it with better rocksDB state store fault tolerance

How was this patch tested?

Modified existing unit tests. For unit tests and backward compatibility tests please refer to: #48356

Was this patch authored or co-authored using generative AI tooling?

No

@WweiL
Copy link
Contributor Author

WweiL commented Oct 4, 2024

This PR depends on #47932 and #47895

@WweiL WweiL changed the title [DO-NOT-REVIEW] RocksDB StateV2 integration [DO-NOT-REVIEW] [SPARK-49883] State Store Checkpoint Structure V2 Integration with RocksDB and RocksDBFileManager Oct 4, 2024
@WweiL WweiL changed the title [DO-NOT-REVIEW] [SPARK-49883] State Store Checkpoint Structure V2 Integration with RocksDB and RocksDBFileManager [DO-NOT-REVIEW] [SPARK-49883][SS] State Store Checkpoint Structure V2 Integration with RocksDB and RocksDBFileManager Oct 4, 2024
@WweiL WweiL force-pushed the integration-for-review branch from c6b8cb6 to f7b4a29 Compare October 18, 2024 18:14
@WweiL WweiL changed the title [DO-NOT-REVIEW] [SPARK-49883][SS] State Store Checkpoint Structure V2 Integration with RocksDB and RocksDBFileManager [SPARK-49883][SS] State Store Checkpoint Structure V2 Integration with RocksDB and RocksDBFileManager Oct 22, 2024
@WweiL WweiL marked this pull request as ready for review November 11, 2024 15:29
/**
* In the case of checkpointFormatVersion 2, if we find multiple latest snapshot files of
* the same version but they have different uniqueIds, we need to find the correct one based on
* the lineage we have.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure whether the comment is wrong or the implementation is wrong, but no matter whether we find multiple latest snapshot files, we need to use the correct one. If it doesn't exist, we should not use it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment means the same as you said, can you tell me where is the confusion?

@@ -965,7 +966,8 @@ class MicroBatchExecution(
updateStateStoreCkptId(execCtx, latestExecPlan)
}
execCtx.reportTimeTaken("commitOffsets") {
if (!commitLog.add(execCtx.batchId, CommitMetadata(watermarkTracker.currentWatermark))) {
if (!commitLog.add(execCtx.batchId,
CommitMetadata(watermarkTracker.currentWatermark, currentStateStoreCkptId.toMap))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get your comment. When customMetrics is empty, it means we have no metric reported. But here in the commit log, it will be better if we can distinguish there is no shuffle partition and there are shuffle partitions but we use V1 so there is no checkpointID. I provides another information to sanity check whether something goes wrong or not.
Commit log is an on disk format, so it is hard to change. We should hold a higher standard for that, because we cannot easily refactored.

* the latest snapshot (version, uniqueId) pair from file listing, this function finds
* the ground truth latest snapshot (version, uniqueId) the db instance needs to load.
*
* @param lineage the ground truth lineage loaded from changelog file
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is lineage required to be sorted by descending order of version? If it is, please add the comment for a requirement. I actually think may be it's a good idea not to make the assumption and just sort it here to guarantee we get the match of the largest version.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lineage is updated when:

  1. in commit, lineageManager.appendLineageItem(LineageItem(newVersion, sessionStateStoreCkptId.get)), where newVersion is loadedVersion + 1
  2. in load, append current (version, id) to the lineage loaded from the changelog.

So logically it is sorted, but I could add a sort per your request

Copy link
Contributor Author

@WweiL WweiL Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the refactor here

* @param lineage the ground truth lineage loaded from changelog file

lineage being sorted is not an enforced requirement because there is a sort in the function itself.

I still sorted the array here:

currVersionLineage = currVersionLineage.sortBy(_.version)

Because I believe this is the only place a sort is needed. By searching through where appendLineage and resetLineage is used.

@@ -965,7 +966,8 @@ class MicroBatchExecution(
updateStateStoreCkptId(execCtx, latestExecPlan)
}
execCtx.reportTimeTaken("commitOffsets") {
if (!commitLog.add(execCtx.batchId, CommitMetadata(watermarkTracker.currentWatermark))) {
if (!commitLog.add(execCtx.batchId,
CommitMetadata(watermarkTracker.currentWatermark, currentStateStoreCkptId.toMap))) {
Copy link
Contributor

@siying siying Dec 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WweiL [UPDATE: I was wrong. Ignore it.] the added the test is great. This is not what I meant in the comment though. I didn't mean downgrading to V1 in the same release, but downgrade to an older release where the code of CommitMetadata doesn't have the extra column added here.

HeartSaVioR pushed a commit that referenced this pull request Dec 5, 2024
…rmation

### What changes were proposed in this pull request?

Break down #48355 into smaller PRs.

## Changelog Reader / Writer
We purpose to save the lineage to the first line of the changelog files.

For changelog reader, there is an abstract function `readLineage` created. In `RocksDBCheckpointManager.getChangelogReader` function, the `readLineage` will be called right after the initialization of the changelog reader to update the file pointer to after the lineage. Subsequent `getNext` function won't be affecter because of this.

For changelog writer, there is an abstract function `writeLineage` that writes the lineage. This function will be called before any actual changelog data is written in `RocksDB.load()`.

The lineage is stored as json.

### Why are the changes needed?

Continue development of SPARK-49374

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added unit test

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #48880 from WweiL/changelog.

Authored-by: WweiL <z920631580@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
case None =>
logWarning("Cannot find latest snapshot based on lineage: "
+ printLineageItems(currVersionLineage))
(0L, None)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@siying
After the refactor requested, the error "cannot_load_base_snapshot" is removed

def cannotFindBaseSnapshotCheckpoint(lineage: String): Throwable = {
    new SparkException (
      errorClass =
        "CANNOT_LOAD_STATE_STORE.CANNOT_FIND_BASE_SNAPSHOT_CHECKPOINT",
      messageParameters = Map("lineage" -> lineage),
      cause = null)
  }

When there is no valid value returned from getLatestSnapshotVersionAndUniqueIdFromLineage. We use (0L, None). This is for the following scenario:

  1. load(0, None), load(1, id1), load(2, id2), load(3, id3)
  2. commit -> now only 4_id4.zip file presents
  3. maintenance -> 4_id4.zip file presents uploaded an becomes the groud truth
  4. load(1, id1) -> Now getLatestSnapshotVersionAndUniqueIdFromLineage returns nothing because the lineage loaded from 1_id1.changelog only contains <1, id1>, but file listing returns <4, id4>
    Now you have to reload from the beginning and use 0 as latestSnapshotVersion

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The handling looks OK but can you explain why cannot_load_base_snapshot is removed? We still need to throw it if none of 1-4 is applicable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I think this is a flaw more deeply to the rocksDB changelog... Because we don't have an empty 0_id0.zip file

Now when there are no matches there could be two cases (even without file listing I believe this still persists)

  1. When you load(v, id) but there is no snapshot created yet.
  2. When there are snapshot created but the lineage doesn't match.

For 1. we need to just set latestSnapshot to 0 and latestSnapshotUniqueId to None for now. For 2 we should throw error.

How about fixing it like this:

latestSnapshotVersionsAndUniqueId match {
            case Some(pair) => pair
            case None if currVersionLineage.head.version == 1 && !fileManager.existsSnapshotFile(1, id1)=>
              logWarning("Cannot find latest snapshot based on lineage: "
                + printLineageItems(currVersionLineage))
              (0L, None)
            case None => throw cannot_load_base_snapshot
          }

When the first version is 1 in currentLineage, it means there is no snapshot created yet, and use 0 as default. In other cases we throw.

This looks a bit messy tho.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WweiL whether throwing error or not is determined by whether we are able to load a correct checkpoint. The condition should be simple:

  1. we have valid snapshot for version v, and all valid changelog fils v+1, v+2, ..., versionToLoad, or
  2. we have changelog file 1, 2, 3, ... versionToLoad.
    If we cannot satisfy it, there is no way we can load a checkpoint, so we should throw an error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think your proposed change is good if you remove the && !fileManager.existsSnapshotFile(1, id1) part.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, changed

@@ -965,7 +966,8 @@ class MicroBatchExecution(
updateStateStoreCkptId(execCtx, latestExecPlan)
}
execCtx.reportTimeTaken("commitOffsets") {
if (!commitLog.add(execCtx.batchId, CommitMetadata(watermarkTracker.currentWatermark))) {
if (!commitLog.add(execCtx.batchId,
CommitMetadata(watermarkTracker.currentWatermark, currentStateStoreCkptId.toMap))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WweiL there must be a misunderstanding here. My previous comment

Same as commented above. It is better to fill a None if it is it is V1.
Did you validate that for V1, the commit log generated can be read by an older version?

is two different comments. Even if validation for V1 is done, I still hope None is used for V1, and I've explained it above. See my explanation above https://github.com/apache/spark/pull/48355/files#r1870082653
I didn't see it addressed yet.

case None =>
logWarning("Cannot find latest snapshot based on lineage: "
+ printLineageItems(currVersionLineage))
(0L, None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The handling looks OK but can you explain why cannot_load_base_snapshot is removed? We still need to throw it if none of 1-4 is applicable?

snapshot: RocksDB#RocksDBSnapshot,
fileManager: RocksDBFileManager,
snapshotsPendingUpload: Set[RocksDBVersionSnapshotInfo],
loggingId: String): RocksDBFileManagerMetrics = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WweiL Oh I got it. It now needs to access lineageManager.

recordedMetrics = None
logInfo(log"Loading ${MDC(LogKeys.VERSION_NUM, version)} with stateStoreCkptId: ${
MDC(LogKeys.UUID, stateStoreCkptId.getOrElse(""))}")
if (enableStateStoreCheckpointIds) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be when stateStoreCkptId is non-empty, rather than enableStateStoreCheckpointIds. No matter whether we support V1->V2 compatibility or not, the source of truth of what we can load is from stateStoreCkptId loaded from the commit log, not the configuration, which is more about what version to be rewritten, nor read. Even if we don't support V1->V2, we still should use stateStoreCkptId but add check and explicitly fail the query, saying it is not supported. This is perhaps already done in driver?

Copy link
Contributor Author

@WweiL WweiL Dec 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also cannot do it here because in version 0 the snapshot unique id is None. Then you are always using loadV1()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Can we have a special treatment of version 0 here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

stateStoreCkptId: Option[String],
readOnly: Boolean = false): RocksDB = {
// An array contains lineage information from [snapShotVersion, version] (inclusive both end)
var currVersionLineage: Array[LineageItem] = lineageManager.getLineage()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can it be moved inside the if below?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No this is intentionally put here. In the case of (loadedVersion==version), we don't need to reload everything from DFS, which is the most common case in reality. Then we need to reuse the existing lineage from memory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. It makes sense.
Here is an issue though. Now the assumption is that currVersionLineage is for loadedVersion. It is because currVersionLIneage is set in line 339 and 345, but loadedVersion is reset in line 382. Exceptions thrown in the middle would make these two inconsistent. Then you need to do the reset to two in the same place. Also it is better to rename function lineageManager.getLineage() to lineageManager.getLineageForCurVersion() or something like that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate more? I think every exception thrown would invalidate data from the memory:

} catch {
case t: Throwable =>
loadedVersion = -1 // invalidate loaded data
lastCommitBasedStateStoreCkptId = None
lastCommittedStateStoreCkptId = None
loadedStateStoreCkptId = None
sessionStateStoreCkptId = None
lineageManager.clear()
throw t

Renamed the function

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a good point. Then there is no correctness issue here, which is good. It still feel better for those final update to happen in the same place, which is less prone to issues when we change the code in the future and do some early return, or handle exceptions differently.

val metadata = fileManager.loadCheckpointFromDfs(latestSnapshotVersion,
workingDir, rocksDBFileMapping, latestSnapshotUniqueId)

loadedVersion = latestSnapshotVersion
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also set the lineage here, perhaps with single entry here. It anyway needs to be set in the else case below.

for (v <- loadedVersion + 1 to endVersion) {
logInfo(log"Replaying changelog on version " +
log"${MDC(LogKeys.VERSION_NUM, v)}")
log"${MDC(LogKeys.END_VERSION, versionsAndUniqueIds.lastOption.map(_._1))}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should assert here that the first item of the lineage has version loadedVersion.

/**
* Initialize key metrics based on the metadata loaded from DFS and open local RocksDB.
*/
private def init(metadata: RocksDBCheckpointMetadata): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably better call it openLocalRocksDB() or something like that.

if (stateStoreCkptId.isDefined || enableStateStoreCheckpointIds && version == 0) {
loadV2(version, stateStoreCkptId, readOnly)
} else {
loadV1(version, readOnly)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably more descriptive names such as something like loadWithCheckpointId() and loadWithoutCheckpointId().

* for the first few versions. Because they are solely loaded from changelog file.
* (i.e. with default minDeltasForSnapshot, there is only 1_uuid1.changelog, no 1_uuid1.zip)
*
* The last item of "lineage" corresponds to one version before the to-be-committed version.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to have a non-committed version in the lineage? Can you remind me at which scenario we need to deal with an uncommitted lineage item?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah no this means lineage only contains committed versions. It basically means we only append lineage item during commit:

lineageManager.appendLineageItem(LineageItem(newVersion, sessionStateStoreCkptId.get))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Thanks for your explanation.

@@ -313,6 +323,11 @@ class RocksDBFileManager(
metadata
}

def existsSnapshotFile(version: Long, checkpointUniqueId: Option[String] = None): Boolean = {
val path = new Path(dfsRootDir)
fm.exists(path) && fm.exists(dfsBatchZipFile(version, checkpointUniqueId))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check rootDirChecked before checking dfsRootDir existence again.

HeartSaVioR pushed a commit that referenced this pull request Dec 9, 2024
…mit log

### What changes were proposed in this pull request?

Per comment from another PR: #48355 (comment)
Add a new compatibility test. This test verifies the following scenario:
A Spark running under version 3.5 trying to read a commit log generated with the new changelog change but under V1.
Then the commit log would look like:
```
{"nextBatchWatermarkMs":1,"stateUniqueIds":{}}
```

But in Spark 3.5 and before, there is no such `stateUniqueIds` field, so we need to make sure queries won't throw error in such cases.

In the new test, I create a `CommitMetadataLegacy` that only has `nextBatchWatermarkMs` and no `stateUniqueIds`, to read from a commit log as above. This simulates the scenario stated above, and the test passed.

### Why are the changes needed?

More testing

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Test only addition

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #49063 from WweiL/commitLog-followup.

Authored-by: WweiL <z920631580@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Copy link
Contributor

@siying siying left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I left some easy-to-address comments. I will probably take a closer look at RocksDBSuite.scala later but I don't have any serious comment anymore.

Copy link
Contributor

@brkyvz brkyvz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, overall very small comments. Just one area where I'm concerned about potential race conditions

@@ -233,6 +233,11 @@
"An error occurred during loading state."
],
"subClass" : {
"CANNOT_FIND_BASE_SNAPSHOT_CHECKPOINT" : {
"message" : [
"Cannot find a base snapshot checkpoint. lineage: <lineage>."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cannot find a base snapshot checkpoint with lineage: <lineage>.

Comment on lines +1755 to +1757
def contains(item: LineageItem): Boolean = {
lineage.contains(item)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how often will this be called and how long can the list be? Do we want to keep around a map or set to make this look up faster?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is controlled by this config:

val STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT =
buildConf("spark.sql.streaming.stateStore.minDeltasForSnapshot")
.internal()
.doc("Minimum number of state store delta files that needs to be generated before they " +
"consolidated into snapshots.")
.version("2.0.0")
.intConf
.createWithDefault(10)

Default is 10, here I believe at most we store 2x the min_delta_version so it should be a fairly small array

Comment on lines 379 to 381
val versionsAndUniqueIds = currVersionLineage
.filter(_.version > loadedVersion)
.filter(_.version <= version)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: to reduce intermediate garbage objects, can you squash these into a single filter? You can even call collect on it and just make it a single operation to even get rid of the map

currVersionLineage.collect {
  case i if i.version > loadedVersion && i.version <= version => (i.version, Option(i.checkpointUniqueId))
}

Comment on lines 820 to 822
snapshot = Some(createSnapshot(
checkpointDir, newVersion, colFamilyNameToIdMap.asScala.toMap,
maxColumnFamilyId.get().toShort, sessionStateStoreCkptId))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uber nit: one parameter per line please

Comment on lines +1241 to +1242
lineageManager.resetLineage(lineageManager.getLineageForCurrVersion()
.filter(i => i.version >= snapshot.version))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the upload of snapshots is async, could there be some race conditions happening here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible that there are race conditions, but it is fine. Because we only require the first version in the lineage to be a snapshot version but not necessarily the latest snapshot version. So this could happen:

  1. in loadWithCheckpointId(): lineage = lineageManager.getLineageForCurrVersion()
  2. uploadSnapshot() // lineageManager.resetLineage
  3. lineage wrote to changelog file.

Now the lineage written to the changelog file actually contains two snapshot versions. But this is fine because later when we really need to getLatestSnapshotVersionAndUniqueIdFromLineage, we list all snapshot files and find the latest one based on the lineage stored in the change log file. So even if it has two snapshot versions we can still find the last one


// (version, checkpointUniqueId) -> immutable files
private val versionToRocksDBFiles =
new ConcurrentHashMap[(Long, Option[String]), Seq[RocksDBImmutableFile]]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I probably would define a UniqueVersion class or something that can be used across everything and maybe that replaces LineageItem too. Don't need to address, but something to think about

fm.list(path, onlyZipFiles)
.map(_.getPath.getName.stripSuffix(".zip").split("_"))
.filter {
case Array(ver, id) => lineage.contains(LineageItem(ver.toLong, id))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this not return a MatchError if there is no _ to split on? What if you see old files using checkpoint v1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes then there would be errors... originaly it was implemented in a way to handle both v1 and v2 like

.filter {
          case Array(version, _) => xxx
          case Array(version) => xxx
        }

cc @siying says we should separate the logic

Comment on lines 390 to 394
.map(_.getName.stripSuffix(".changelog").split("_"))
.map {
case Array(version, _) => version.toLong
case Array(version) => version.toLong
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try to combine these map functions to avoid creating intermediate garbage - it may harm GC performance for low latency queries

Comment on lines +543 to +547
.map(_.getName.stripSuffix(".zip").split("_"))
.map {
case Array(version, uniqueId) => (version.toLong, Some(uniqueId))
case Array(version) => (version.toLong, None)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@brkyvz
Copy link
Contributor

brkyvz commented Dec 13, 2024

Thank you for addressing the comments! Merging to master

@brkyvz
Copy link
Contributor

brkyvz commented Dec 13, 2024

I'm having trouble merging. Asking @HeartSaVioR for help in merging

@HeartSaVioR
Copy link
Contributor

I'll take over and keep helping to merge till @brkyvz gets his power again :)

Thanks! Merging to master (on behalf of @brkyvz )

ericm-db pushed a commit to ericm-db/spark that referenced this pull request Dec 17, 2024
…h RocksDB and RocksDBFileManager

### What changes were proposed in this pull request?

This PR enables RocksDB to read <zip, changelog> file watermarked with unique ids (e.g. `version_uniqueId.zip`, `version_uniqueId.changelog`). Below is a explanation on the changes and rationale.

Now for each changelog file, we put a "version: uniqueId" to its first line, from it's current version to the previous snapshot version. For each snapshot (zip) file, there is no change other than their name (`version_uniqueId.zip`), because snapshot files are single source of truth.

In addition to `LastCommitBasedCheckpointId, lastCommittedCheckpointId, loadedCheckpointId` added in apache#47895 (review), also add a in-memory map `versionToUniqueIdLineage` that maps version to unique Id. This is useful when you reuse the same rocksDB instance in the executor, so you don't need to load the lineage from the changelog file again.

## RocksDB:
#### Load
- When `loadedVersion != version`, try to load a changelog file with `version_checkpointUniqueId.changelog`. There could be multiple cases:
    1. Version corresponds to a zip file:
        1. `version_uniqueId.zip`, this means (either changelog was not enabled, or it was enabled but version happens to be a checkpoint file), and previously query run ckpt v2
        2. `version.zip`, this means (either changelog was not enabled, or it was enabled but version happens to be a checkpoint file), and previously query run ckpt v1
    2. Version corresponds to a changelog file:
        1. `version_uniqueId.changelog`, this means changelog was enabled, and previously query run ckpt v2
        2. `version.changelog`, this means changelog was enabled, and previously query run ckpt v1

- For case i.a, we construct a new empty lineage `(version, sessionCheckpointId)`. `version_uniqueId.changelog`.
- For case ii.a, we read the lineage file stored in
- For case i.b and ii.b, there is no need to load the lineage as they were not presented before, we just load the corresponding file without `uniqueId`, but newer files will be constructed with uniqueId.

checkpoint version v1 to checkpoint version v2.

Next the code finds the latest snapshot version through file listing. When there are multiple snapshot files with the same version but different unique Id (main problem this project was trying to solve), the correct one will be loaded based on the checkpoint id.

Then changelog is replayed with the awareness of lineage. The lineage is stored in memory for next load().

Last, load the changelog writer for version + 1, and write the lineage (version + 1, sessionCheckpointId) to the first line of the file. While it seems that the lineage is written early, it is safe because the change log writer is not committed yet.

- When `loadedVersion == version`, the same rocks db instance is reused and the lineage is stored in memory to `versionToUniqueIdLineage`.

#### Commit
- Also save `sessionCheckpointId` to `latestSnapshot`
- Add `(newVersion, sessionCheckpointId)` to `versionToUniqueIdLineage`

#### Abort
Also clear up `versionToUniqueIdLineage`

## RocksDBFileManager:
- A bunch of add-ups to make until code uniqueId aware. Now all places that return version returns a <version, Option[uniqueId]> pair, in v1 format, the option is None.
-
### deleteOldVersions:
If there are multiple `version_uniqueId1.zip`(changelog) and `versioion.uniqueId2.zip`, all are deleted.

## Changelog Reader / Writer
We purpose to save the lineage to the first line of the changelog files.

For changelog reader, there is an abstract function `readLineage` created. In `RocksDBCheckpointManager.getChangelogReader` function, the `readLineage` will be called right after the initialization of the changelog reader to update the file pointer to after the lineage. Subsequent `getNext` function won't be affecter because of this.

For changelog writer, there is an abstract function `writeLineage` that writes the lineage. This function will be called before any actual changelog data is written in `RocksDB.load()`.

### Why are the changes needed?

Improve fault tolerance to RocksDB State Store.

### Does this PR introduce _any_ user-facing change?

Not yet, after the V2 format project is finished, customer can use the new config to enable it with better rocksDB state store fault tolerance

### How was this patch tested?

Modified existing unit tests. For unit tests and backward compatibility tests please refer to: apache#48356

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#48355 from WweiL/integration-for-review.

Lead-authored-by: WweiL <z920631580@gmail.com>
Co-authored-by: Wei Liu <wei.liu@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants