Skip to content

Commit

Permalink
[SPARK-50339][SPARK-50360][SS] Enable changelog to store lineage info…
Browse files Browse the repository at this point in the history
…rmation

### What changes were proposed in this pull request?

Break down apache#48355 into smaller PRs.

## Changelog Reader / Writer
We purpose to save the lineage to the first line of the changelog files.

For changelog reader, there is an abstract function `readLineage` created. In `RocksDBCheckpointManager.getChangelogReader` function, the `readLineage` will be called right after the initialization of the changelog reader to update the file pointer to after the lineage. Subsequent `getNext` function won't be affecter because of this.

For changelog writer, there is an abstract function `writeLineage` that writes the lineage. This function will be called before any actual changelog data is written in `RocksDB.load()`.

The lineage is stored as json.

### Why are the changes needed?

Continue development of SPARK-49374

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added unit test

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#48880 from WweiL/changelog.

Authored-by: WweiL <z920631580@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
  • Loading branch information
WweiL authored and HeartSaVioR committed Dec 5, 2024
1 parent 7278bc7 commit af4f37c
Show file tree
Hide file tree
Showing 6 changed files with 565 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@
},
"INVALID_CHANGE_LOG_READER_VERSION" : {
"message" : [
"The change log reader version cannot be <version>."
"The change log reader version cannot be <version>. The checkpoint probably is from a future Spark version, please upgrade your Spark."
]
},
"INVALID_CHANGE_LOG_WRITER_VERSION" : {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ class RocksDB(
log"${MDC(LogKeys.VERSION_NUM, v)}")
var changelogReader: StateStoreChangelogReader = null
try {
changelogReader = fileManager.getChangelogReader(v, useColumnFamilies)
changelogReader = fileManager.getChangelogReader(v)
changelogReader.foreach { case (recordType, key, value) =>
recordType match {
case RecordType.PUT_RECORD =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,58 +153,73 @@ class RocksDBFileManager(
@volatile private var rootDirChecked: Boolean = false
private val versionToRocksDBFiles = new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]]

private def getChangelogVersion(useColumnFamilies: Boolean): Short = {
val changelogVersion: Short = if (useColumnFamilies) {
2
} else {
1
/**
* Get the changelog version based on rocksDB features.
* @return the version of changelog
*/
private def getChangelogWriterVersion(
useColumnFamilies: Boolean,
stateStoreCheckpointIdEnabled: Boolean): Short = {
(useColumnFamilies, stateStoreCheckpointIdEnabled) match {
case (false, false) => 1
case (true, false) => 2
case (false, true) => 3
case _ => 4
}
changelogVersion
}

def getChangeLogWriter(
version: Long,
useColumnFamilies: Boolean = false): StateStoreChangelogWriter = {
val changelogFile = dfsChangelogFile(version)
useColumnFamilies: Boolean = false,
checkpointUniqueId: Option[String] = None,
stateStoreCheckpointIdLineage: Option[Array[LineageItem]] = None
): StateStoreChangelogWriter = {
val changelogFile = dfsChangelogFile(version, checkpointUniqueId)
if (!rootDirChecked) {
val rootDir = new Path(dfsRootDir)
if (!fm.exists(rootDir)) fm.mkdirs(rootDir)
rootDirChecked = true
}

val changelogVersion = getChangelogVersion(useColumnFamilies)
val enableStateStoreCheckpointIds = checkpointUniqueId.isDefined
val changelogVersion = getChangelogWriterVersion(
useColumnFamilies, enableStateStoreCheckpointIds)
val changelogWriter = changelogVersion match {
case 1 =>
new StateStoreChangelogWriterV1(fm, changelogFile, codec)
case 2 =>
new StateStoreChangelogWriterV2(fm, changelogFile, codec)
case 3 =>
assert(enableStateStoreCheckpointIds && stateStoreCheckpointIdLineage.isDefined,
"StateStoreChangelogWriterV3 should only be initialized when " +
"state store checkpoint unique id is enabled")
new StateStoreChangelogWriterV3(fm, changelogFile, codec, stateStoreCheckpointIdLineage.get)
case 4 =>
assert(enableStateStoreCheckpointIds && stateStoreCheckpointIdLineage.isDefined,
"StateStoreChangelogWriterV4 should only be initialized when " +
"state store checkpoint unique id is enabled")
new StateStoreChangelogWriterV4(fm, changelogFile, codec, stateStoreCheckpointIdLineage.get)
case _ =>
throw QueryExecutionErrors.invalidChangeLogWriterVersion(changelogVersion)
}

logInfo(log"Loaded change log reader version " +
log"${MDC(LogKeys.FILE_VERSION, changelogWriter.version)}")

changelogWriter
}

// Get the changelog file at version
def getChangelogReader(
version: Long,
useColumnFamilies: Boolean = false): StateStoreChangelogReader = {
val changelogFile = dfsChangelogFile(version)

// Note that ideally we should get the version for the reader from the
// changelog itself. However, since we don't record this for v1, we need to
// rely on external arguments to make this call today. Within the reader, we verify
// for the correctness of the decided/expected version. We might revisit this pattern
// as we add more changelog versions in the future.
val changelogVersion = getChangelogVersion(useColumnFamilies)
val changelogReader = changelogVersion match {
case 1 =>
new StateStoreChangelogReaderV1(fm, changelogFile, codec)
case 2 =>
new StateStoreChangelogReaderV2(fm, changelogFile, codec)
case _ =>
throw QueryExecutionErrors.invalidChangeLogReaderVersion(changelogVersion)
}
changelogReader
checkpointUniqueId: Option[String] = None): StateStoreChangelogReader = {
val changelogFile = dfsChangelogFile(version, checkpointUniqueId)
val reader = new StateStoreChangelogReaderFactory(fm, changelogFile, codec)
.constructChangelogReader()

logInfo(log"Loaded change log reader version ${MDC(LogKeys.FILE_VERSION, reader.version)}")

reader
}

/**
Expand Down Expand Up @@ -777,7 +792,9 @@ class RocksDBFileManager(
private def dfsBatchZipFile(version: Long): Path = new Path(s"$dfsRootDir/$version.zip")
// We use changelog suffix intentionally so that we can tell the difference from changelog file of
// HDFSBackedStateStore which is named version.delta.
private def dfsChangelogFile(version: Long): Path = new Path(s"$dfsRootDir/$version.changelog")
private def dfsChangelogFile(version: Long, checkpointUniqueId: Option[String] = None): Path =
checkpointUniqueId.map(id => new Path(s"$dfsRootDir/${version}_$id.changelog"))
.getOrElse(new Path(s"$dfsRootDir/$version.changelog"))

private def localMetadataFile(parentDir: File): File = new File(parentDir, "metadata")

Expand Down
Loading

0 comments on commit af4f37c

Please sign in to comment.