-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-49883][SS] State Store Checkpoint Structure V2 Integration with RocksDB and RocksDBFileManager #48355
Changes from 57 commits
8eb89f4
1c27324
caf19dc
2609cd2
ca2e5d7
f7b4a29
e2bd070
59bda44
f563b26
a104ba9
8d0537d
d3f3201
3cc3c1f
f2f6059
3768855
073ebf1
30a19b1
8b2b7f7
cb16a31
fecd88e
1aaaba5
af20cdf
cdebd16
97c8356
e9dfe25
09f540c
0d32561
8e439fc
3f7c5de
0cb1787
d3fe0b4
64956d8
cd5941c
35074f7
9dd52d4
2f7abf7
d99d005
342710b
e93c70b
3245b62
456da51
473e0de
2ca656e
e3c5dc5
86e2fa8
1a8357f
9973b59
c559e5f
8201454
83f7fde
6fa4177
97d31af
a7ea81d
abe6056
72876a8
e7120d5
91d5e81
db0a304
a973c8e
0273e4c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -513,6 +513,7 @@ class MicroBatchExecution( | |
execCtx.startOffsets ++= execCtx.endOffsets | ||
watermarkTracker.setWatermark( | ||
math.max(watermarkTracker.currentWatermark, commitMetadata.nextBatchWatermarkMs)) | ||
currentStateStoreCkptId ++= commitMetadata.stateUniqueIds | ||
WweiL marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} else if (latestCommittedBatchId == latestBatchId - 1) { | ||
execCtx.endOffsets.foreach { | ||
case (source: Source, end: Offset) => | ||
|
@@ -965,7 +966,8 @@ class MicroBatchExecution( | |
updateStateStoreCkptId(execCtx, latestExecPlan) | ||
} | ||
execCtx.reportTimeTaken("commitOffsets") { | ||
if (!commitLog.add(execCtx.batchId, CommitMetadata(watermarkTracker.currentWatermark))) { | ||
if (!commitLog.add(execCtx.batchId, | ||
CommitMetadata(watermarkTracker.currentWatermark, currentStateStoreCkptId.toMap))) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If it is V1, whaat is filled here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as #48355 (comment) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as commented above. It is better to fill a None if it is it is V1. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Update: Did you mean an old spark version reading a new commit log created (but under V1)? This is not verified in the previous PR and I will verify and will file a separate PR if needed There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [UPDATE: I was wrong. Ignore it.] Further clarify: I mean the commit log generated by the new code with V1 should be readable by older code. It is likely to be OK either way (if we doesn't work we need to bump up the version), but I think having a Noneable field is more straight-forward. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Never mind. I was wrong. I missed that you created There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @WweiL there must be a misunderstanding here. My previous comment
is two different comments. Even if validation for V1 is done, I still hope None is used for V1, and I've explained it above. See my explanation above https://github.com/apache/spark/pull/48355/files#r1870082653 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @WweiL I perhaps overlooked it in the previous code review, but this is a format change, so we need to fix it as soon as we can. We can separate this to another PR if it can make the format fix merged sooner. |
||
throw QueryExecutionErrors.concurrentStreamLogUpdate(execCtx.batchId) | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.