Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-49884][SS] State Store Checkpoint Structure V2 Backward compatibility Tests #48356

Closed
wants to merge 5 commits into from

Conversation

WweiL
Copy link
Contributor

@WweiL WweiL commented Oct 4, 2024

What changes were proposed in this pull request?

This PR adds backward compatibility test in both query level and rocksDB level. Concretely, in query level, we test the following scenarios:

// Enable and disable changelog under ckpt v2
(changelogEnabled, ckptv2), changelogEnabled, ckptv2)
(changelogDisabled, ckptv2),changelogDisabled, ckptv2)

// Cross version cross changelog enabled/disabled
(changelogDisabled, ckptv1), changelogDisabled, ckptv2)
(changelogEnabled, ckptv1), changelogEnabled, ckptv2)
(changelogDisabled, ckptv1), changelogEnabled, ckptv2)
(changelogEnabled, ckptv1), changelogDisabled, ckptv2)

The query is first started with the first pair of configs, then restarted with the second pair of configs under the same checkpoint.

Why are the changes needed?

Essential test coverage.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Test only addition

Was this patch authored or co-authored using generative AI tooling?

No

@WweiL
Copy link
Contributor Author

WweiL commented Oct 4, 2024

This PR depends on #48355

@WweiL WweiL changed the title [DO-NOT-REVIEW] Backward compatibility Tests [DO-NOT-REVIEW] [SPARK-49884][SS] State Store Checkpoint Structure V2 Backward compatibility Tests Oct 4, 2024
@WweiL WweiL changed the title [DO-NOT-REVIEW] [SPARK-49884][SS] State Store Checkpoint Structure V2 Backward compatibility Tests [SPARK-49884][SS] State Store Checkpoint Structure V2 Backward compatibility Tests Oct 22, 2024
@WweiL WweiL marked this pull request as ready for review October 22, 2024 23:35
@WweiL WweiL closed this Dec 5, 2024
HeartSaVioR pushed a commit that referenced this pull request Dec 13, 2024
…h RocksDB and RocksDBFileManager

### What changes were proposed in this pull request?

This PR enables RocksDB to read <zip, changelog> file watermarked with unique ids (e.g. `version_uniqueId.zip`, `version_uniqueId.changelog`). Below is a explanation on the changes and rationale.

Now for each changelog file, we put a "version: uniqueId" to its first line, from it's current version to the previous snapshot version. For each snapshot (zip) file, there is no change other than their name (`version_uniqueId.zip`), because snapshot files are single source of truth.

In addition to `LastCommitBasedCheckpointId, lastCommittedCheckpointId, loadedCheckpointId` added in #47895 (review), also add a in-memory map `versionToUniqueIdLineage` that maps version to unique Id. This is useful when you reuse the same rocksDB instance in the executor, so you don't need to load the lineage from the changelog file again.

## RocksDB:
#### Load
- When `loadedVersion != version`, try to load a changelog file with `version_checkpointUniqueId.changelog`. There could be multiple cases:
    1. Version corresponds to a zip file:
        1. `version_uniqueId.zip`, this means (either changelog was not enabled, or it was enabled but version happens to be a checkpoint file), and previously query run ckpt v2
        2. `version.zip`, this means (either changelog was not enabled, or it was enabled but version happens to be a checkpoint file), and previously query run ckpt v1
    2. Version corresponds to a changelog file:
        1. `version_uniqueId.changelog`, this means changelog was enabled, and previously query run ckpt v2
        2. `version.changelog`, this means changelog was enabled, and previously query run ckpt v1

- For case i.a, we construct a new empty lineage `(version, sessionCheckpointId)`. `version_uniqueId.changelog`.
- For case ii.a, we read the lineage file stored in
- For case i.b and ii.b, there is no need to load the lineage as they were not presented before, we just load the corresponding file without `uniqueId`, but newer files will be constructed with uniqueId.

checkpoint version v1 to checkpoint version v2.

Next the code finds the latest snapshot version through file listing. When there are multiple snapshot files with the same version but different unique Id (main problem this project was trying to solve), the correct one will be loaded based on the checkpoint id.

Then changelog is replayed with the awareness of lineage. The lineage is stored in memory for next load().

Last, load the changelog writer for version + 1, and write the lineage (version + 1, sessionCheckpointId) to the first line of the file. While it seems that the lineage is written early, it is safe because the change log writer is not committed yet.

- When `loadedVersion == version`, the same rocks db instance is reused and the lineage is stored in memory to `versionToUniqueIdLineage`.

#### Commit
- Also save `sessionCheckpointId` to `latestSnapshot`
- Add `(newVersion, sessionCheckpointId)` to `versionToUniqueIdLineage`

#### Abort
Also clear up `versionToUniqueIdLineage`

## RocksDBFileManager:
- A bunch of add-ups to make until code uniqueId aware. Now all places that return version returns a <version, Option[uniqueId]> pair, in v1 format, the option is None.
-
### deleteOldVersions:
If there are multiple `version_uniqueId1.zip`(changelog) and `versioion.uniqueId2.zip`, all are deleted.

## Changelog Reader / Writer
We purpose to save the lineage to the first line of the changelog files.

For changelog reader, there is an abstract function `readLineage` created. In `RocksDBCheckpointManager.getChangelogReader` function, the `readLineage` will be called right after the initialization of the changelog reader to update the file pointer to after the lineage. Subsequent `getNext` function won't be affecter because of this.

For changelog writer, there is an abstract function `writeLineage` that writes the lineage. This function will be called before any actual changelog data is written in `RocksDB.load()`.

### Why are the changes needed?

Improve fault tolerance to RocksDB State Store.

### Does this PR introduce _any_ user-facing change?

Not yet, after the V2 format project is finished, customer can use the new config to enable it with better rocksDB state store fault tolerance

### How was this patch tested?

Modified existing unit tests. For unit tests and backward compatibility tests please refer to: #48356

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #48355 from WweiL/integration-for-review.

Lead-authored-by: WweiL <z920631580@gmail.com>
Co-authored-by: Wei Liu <wei.liu@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
ericm-db pushed a commit to ericm-db/spark that referenced this pull request Dec 17, 2024
…h RocksDB and RocksDBFileManager

### What changes were proposed in this pull request?

This PR enables RocksDB to read <zip, changelog> file watermarked with unique ids (e.g. `version_uniqueId.zip`, `version_uniqueId.changelog`). Below is a explanation on the changes and rationale.

Now for each changelog file, we put a "version: uniqueId" to its first line, from it's current version to the previous snapshot version. For each snapshot (zip) file, there is no change other than their name (`version_uniqueId.zip`), because snapshot files are single source of truth.

In addition to `LastCommitBasedCheckpointId, lastCommittedCheckpointId, loadedCheckpointId` added in apache#47895 (review), also add a in-memory map `versionToUniqueIdLineage` that maps version to unique Id. This is useful when you reuse the same rocksDB instance in the executor, so you don't need to load the lineage from the changelog file again.

## RocksDB:
#### Load
- When `loadedVersion != version`, try to load a changelog file with `version_checkpointUniqueId.changelog`. There could be multiple cases:
    1. Version corresponds to a zip file:
        1. `version_uniqueId.zip`, this means (either changelog was not enabled, or it was enabled but version happens to be a checkpoint file), and previously query run ckpt v2
        2. `version.zip`, this means (either changelog was not enabled, or it was enabled but version happens to be a checkpoint file), and previously query run ckpt v1
    2. Version corresponds to a changelog file:
        1. `version_uniqueId.changelog`, this means changelog was enabled, and previously query run ckpt v2
        2. `version.changelog`, this means changelog was enabled, and previously query run ckpt v1

- For case i.a, we construct a new empty lineage `(version, sessionCheckpointId)`. `version_uniqueId.changelog`.
- For case ii.a, we read the lineage file stored in
- For case i.b and ii.b, there is no need to load the lineage as they were not presented before, we just load the corresponding file without `uniqueId`, but newer files will be constructed with uniqueId.

checkpoint version v1 to checkpoint version v2.

Next the code finds the latest snapshot version through file listing. When there are multiple snapshot files with the same version but different unique Id (main problem this project was trying to solve), the correct one will be loaded based on the checkpoint id.

Then changelog is replayed with the awareness of lineage. The lineage is stored in memory for next load().

Last, load the changelog writer for version + 1, and write the lineage (version + 1, sessionCheckpointId) to the first line of the file. While it seems that the lineage is written early, it is safe because the change log writer is not committed yet.

- When `loadedVersion == version`, the same rocks db instance is reused and the lineage is stored in memory to `versionToUniqueIdLineage`.

#### Commit
- Also save `sessionCheckpointId` to `latestSnapshot`
- Add `(newVersion, sessionCheckpointId)` to `versionToUniqueIdLineage`

#### Abort
Also clear up `versionToUniqueIdLineage`

## RocksDBFileManager:
- A bunch of add-ups to make until code uniqueId aware. Now all places that return version returns a <version, Option[uniqueId]> pair, in v1 format, the option is None.
-
### deleteOldVersions:
If there are multiple `version_uniqueId1.zip`(changelog) and `versioion.uniqueId2.zip`, all are deleted.

## Changelog Reader / Writer
We purpose to save the lineage to the first line of the changelog files.

For changelog reader, there is an abstract function `readLineage` created. In `RocksDBCheckpointManager.getChangelogReader` function, the `readLineage` will be called right after the initialization of the changelog reader to update the file pointer to after the lineage. Subsequent `getNext` function won't be affecter because of this.

For changelog writer, there is an abstract function `writeLineage` that writes the lineage. This function will be called before any actual changelog data is written in `RocksDB.load()`.

### Why are the changes needed?

Improve fault tolerance to RocksDB State Store.

### Does this PR introduce _any_ user-facing change?

Not yet, after the V2 format project is finished, customer can use the new config to enable it with better rocksDB state store fault tolerance

### How was this patch tested?

Modified existing unit tests. For unit tests and backward compatibility tests please refer to: apache#48356

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#48355 from WweiL/integration-for-review.

Lead-authored-by: WweiL <z920631580@gmail.com>
Co-authored-by: Wei Liu <wei.liu@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant