Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-49883][SS] State Store Checkpoint Structure V2 Integration with RocksDB and RocksDBFileManager #48355

Closed
wants to merge 60 commits into from
Closed
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
8eb89f4
review version
WweiL Oct 4, 2024
1c27324
split test
WweiL Oct 4, 2024
caf19dc
retrigger
WweiL Oct 4, 2024
2609cd2
merge master
WweiL Oct 18, 2024
ca2e5d7
retrigger
WweiL Oct 18, 2024
f7b4a29
continue merge master
WweiL Oct 18, 2024
e2bd070
Merge remote-tracking branch 'spark/master' into integration-for-review
WweiL Oct 22, 2024
59bda44
retrigger
WweiL Oct 22, 2024
f563b26
long journey to integrate with dc6f9dfe11e76f890ff2986f866853bcac2630…
WweiL Oct 22, 2024
a104ba9
cleanup
WweiL Oct 22, 2024
8d0537d
empty
WweiL Nov 7, 2024
d3f3201
merge
WweiL Nov 11, 2024
3cc3c1f
minor
WweiL Nov 11, 2024
f2f6059
comments
WweiL Nov 11, 2024
3768855
fix spark throwable suite
WweiL Nov 12, 2024
073ebf1
fix test
WweiL Nov 12, 2024
30a19b1
resolve conflicts
WweiL Nov 12, 2024
8b2b7f7
pass
WweiL Nov 18, 2024
cb16a31
v1
WweiL Nov 18, 2024
fecd88e
review
WweiL Nov 18, 2024
1aaaba5
review
WweiL Nov 18, 2024
af20cdf
comments'
WweiL Nov 19, 2024
cdebd16
minor
WweiL Nov 19, 2024
97c8356
test col family (v2)
WweiL Nov 19, 2024
e9dfe25
merge
WweiL Nov 19, 2024
09f540c
wip
WweiL Nov 20, 2024
0d32561
address Jungtaek's comment
WweiL Nov 20, 2024
8e439fc
add jira
WweiL Nov 20, 2024
3f7c5de
burak's comment, except reading version from changelog
WweiL Nov 20, 2024
0cb1787
burak's comment, add StateStoreChangelogReaderFactory
WweiL Nov 21, 2024
d3fe0b4
wip
WweiL Nov 22, 2024
64956d8
did all i can do
WweiL Nov 24, 2024
cd5941c
review
WweiL Nov 24, 2024
35074f7
retrigger
WweiL Nov 25, 2024
9dd52d4
retrigger
WweiL Nov 25, 2024
2f7abf7
Merge branch 'changelog' into integration-v2
WweiL Nov 20, 2024
d99d005
merge
WweiL Nov 25, 2024
342710b
without cleanup (with logs)
WweiL Nov 26, 2024
e93c70b
cleanup
WweiL Nov 26, 2024
3245b62
merge master
WweiL Nov 26, 2024
456da51
add v12 test
WweiL Dec 2, 2024
473e0de
addressed siying's comment
WweiL Dec 2, 2024
2ca656e
import order
WweiL Dec 2, 2024
e3c5dc5
test override to verify no opened streams
WweiL Dec 3, 2024
86e2fa8
what is this
WweiL Dec 3, 2024
1a8357f
final comments
WweiL Dec 4, 2024
9973b59
oops missed this, beforeEach should clear all open streams
WweiL Dec 4, 2024
c559e5f
merge
WweiL Dec 4, 2024
8201454
retrigger
WweiL Dec 4, 2024
83f7fde
tmp
WweiL Dec 5, 2024
6fa4177
Merge branch 'changelog' into integration-for-review
WweiL Dec 5, 2024
97d31af
siying's comments
WweiL Dec 5, 2024
a7ea81d
merge master
WweiL Dec 5, 2024
abe6056
cleanup
WweiL Dec 5, 2024
72876a8
comments
WweiL Dec 6, 2024
e7120d5
siying's comment
WweiL Dec 9, 2024
91d5e81
continue address siying's comment
WweiL Dec 9, 2024
db0a304
siying's comment
WweiL Dec 10, 2024
a973c8e
minor, the warning was too verbose for first version, change to debug
WweiL Dec 10, 2024
0273e4c
burak's comment
WweiL Dec 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,11 @@
"An error occurred during loading state."
],
"subClass" : {
"CANNOT_GET_LATEST_SNAPSHOT_VERSION_AND_UNIQUE_ID_FROM_LINEAGE" : {
"message" : [
"Cannot get latest snapshot version and unique ids from lineage. lineage: <lineage>, latest snapshot versions and unique ids: <snapshotVersionAndUniqueIds>."
WweiL marked this conversation as resolved.
Show resolved Hide resolved
]
},
"CANNOT_READ_CHECKPOINT" : {
"message" : [
"Cannot read RocksDB checkpoint metadata. Expected <expectedVersion>, but found <actualVersion>."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,8 @@ private[spark] object LogKeys {
case object LABEL_COLUMN extends LogKey
case object LARGEST_CLUSTER_INDEX extends LogKey
case object LAST_ACCESS_TIME extends LogKey
case object LAST_COMMITTED_CHECKPOINT_ID extends LogKey
case object LAST_COMMIT_BASED_CHECKPOINT_ID extends LogKey
case object LAST_VALID_TIME extends LogKey
case object LATEST_BATCH_ID extends LogKey
case object LATEST_COMMITTED_BATCH_ID extends LogKey
Expand All @@ -359,8 +361,10 @@ private[spark] object LogKeys {
case object LEFT_EXPR extends LogKey
case object LEFT_LOGICAL_PLAN_STATS_SIZE_IN_BYTES extends LogKey
case object LINE extends LogKey
case object LINEAGE extends LogKey
case object LINE_NUM extends LogKey
case object LISTENER extends LogKey
case object LOADED_CHECKPOINT_ID extends LogKey
case object LOADED_VERSION extends LogKey
case object LOAD_FACTOR extends LogKey
case object LOAD_TIME extends LogKey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2605,6 +2605,17 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE
cause = null)
}

def cannotGetLatestSnapshotVersionAndUniqueIdFromLineage(
lineage: String, snapshotVersionAndUniqueIds: String): Throwable = {
new SparkException (
errorClass =
"CANNOT_LOAD_STATE_STORE.CANNOT_GET_LATEST_SNAPSHOT_VERSION_AND_UNIQUE_ID_FROM_LINEAGE",
messageParameters = Map(
"lineage" -> lineage,
"snapshotVersionAndUniqueIds" -> snapshotVersionAndUniqueIds),
cause = null)
}

def unexpectedFileSize(
dfsFile: Path,
localFile: File,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,7 @@ class MicroBatchExecution(
execCtx.startOffsets ++= execCtx.endOffsets
watermarkTracker.setWatermark(
math.max(watermarkTracker.currentWatermark, commitMetadata.nextBatchWatermarkMs))
currentStateStoreCkptId ++= commitMetadata.stateUniqueIds
WweiL marked this conversation as resolved.
Show resolved Hide resolved
} else if (latestCommittedBatchId == latestBatchId - 1) {
execCtx.endOffsets.foreach {
case (source: Source, end: Offset) =>
Expand Down Expand Up @@ -965,7 +966,8 @@ class MicroBatchExecution(
updateStateStoreCkptId(execCtx, latestExecPlan)
}
execCtx.reportTimeTaken("commitOffsets") {
if (!commitLog.add(execCtx.batchId, CommitMetadata(watermarkTracker.currentWatermark))) {
if (!commitLog.add(execCtx.batchId,
CommitMetadata(watermarkTracker.currentWatermark, currentStateStoreCkptId.toMap))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is V1, whaat is filled here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as commented above. It is better to fill a None if it is it is V1.
Did you validate that for V1, the commit log generated can be read by an older version?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you validate that for V1, the commit log generated can be read by an older version?
Yes this is verified in the previous PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: Did you mean an old spark version reading a new commit log created (but under V1)? This is not verified in the previous PR and I will verify and will file a separate PR if needed

Copy link
Contributor

@siying siying Dec 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[UPDATE: I was wrong. Ignore it.] Further clarify: I mean the commit log generated by the new code with V1 should be readable by older code. It is likely to be OK either way (if we doesn't work we need to bump up the version), but I think having a Noneable field is more straight-forward.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind. I was wrong. I missed that you created class CommitLogLegacy in your unit test, so we are all good!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WweiL there must be a misunderstanding here. My previous comment

Same as commented above. It is better to fill a None if it is it is V1.
Did you validate that for V1, the commit log generated can be read by an older version?

is two different comments. Even if validation for V1 is done, I still hope None is used for V1, and I've explained it above. See my explanation above https://github.com/apache/spark/pull/48355/files#r1870082653
I didn't see it addressed yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@siying I will address this in another PR, depending on if this is merged or yet I might create a separate one:
#49063
Let's scope this PR and ignore this for now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WweiL I perhaps overlooked it in the previous code review, but this is a format change, so we need to fix it as soon as we can. We can separate this to another PR if it can make the format fix merged sooner.

throw QueryExecutionErrors.concurrentStreamLogUpdate(execCtx.batchId)
}
}
Expand Down
Loading