Skip to content

Commit

Permalink
Introduce new API to FileCommitProtocol allow flexible file naming
Browse files Browse the repository at this point in the history
  • Loading branch information
c21 committed Jun 22, 2021
1 parent 41af409 commit 8cc4899
Showing 1 changed file with 68 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,38 @@ abstract class FileCommitProtocol extends Logging {
* if a task is going to write out multiple files to the same dir. The file commit protocol only
* guarantees that files written by different tasks will not conflict.
*/
@deprecated("use newTaskFile(taskContext, dir, spec)", "3.2.0")
def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String

/**
* Notifies the commit protocol to add a new file, and gets back the full path that should be
* used. Must be called on the executors when running tasks.
*
* Note that the returned temp file may have an arbitrary path. The commit protocol only
* promises that the file will be at the location specified by the arguments after job commit.
*
* The "dir" parameter specifies the sub-directory within the base path, used to specify
* partitioning. The "spec" parameter specifies the file name. The rest are left to the commit
* protocol implementation to decide.
*
* Important: it is the caller's responsibility to add uniquely identifying content to "spec"
* if a task is going to write out multiple files to the same dir. The file commit protocol only
* guarantees that files written by different tasks will not conflict.
*
* This API should be implemented and called, instead of deprecated
* [[newTaskTempFile(taskContest, dir, ext)]]. Provide a default implementation here to be
* backward compatible with custom [[FileCommitProtocol]] implementations before Spark 3.2.0.
*/
def newTaskTempFile(
taskContext: TaskAttemptContext, dir: Option[String], spec: FileNameSpec): String = {
if (spec.prefix.isEmpty) {
newTaskTempFile(taskContext, dir, spec.ext)
} else {
throw new UnsupportedOperationException(s"${getClass.getSimpleName}.newTaskTempFile does " +
s"not support file name prefix: ${spec.prefix}")
}
}

/**
* Similar to newTaskTempFile(), but allows files to committed to an absolute output location.
* Depending on the implementation, there may be weaker guarantees around adding files this way.
Expand All @@ -100,9 +130,38 @@ abstract class FileCommitProtocol extends Logging {
* if a task is going to write out multiple files to the same dir. The file commit protocol only
* guarantees that files written by different tasks will not conflict.
*/
@deprecated("use newTaskTempFileAbsPath(taskContext, absoluteDir, spec)", "3.2.0")
def newTaskTempFileAbsPath(
taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String

/**
* Similar to newTaskTempFile(), but allows files to committed to an absolute output location.
* Depending on the implementation, there may be weaker guarantees around adding files this way.
*
* The "absoluteDir" parameter specifies the final absolute directory of file. The "spec"
* parameter specifies the file name. The rest are left to the commit protocol implementation to
* decide.
*
* Important: it is the caller's responsibility to add uniquely identifying content to "spec"
* if a task is going to write out multiple files to the same dir. The file commit protocol only
* guarantees that files written by different tasks will not conflict.
*
* This API should be implemented and called, instead of deprecated
* [[newTaskTempFileAbsPath(taskContest, absoluteDir, ext)]]. Provide a default implementation
* here to be backward compatible with custom [[FileCommitProtocol]] implementations before
* Spark 3.2.0.
*/
def newTaskTempFileAbsPath(
taskContext: TaskAttemptContext, absoluteDir: String, spec: FileNameSpec): String = {
if (spec.prefix.isEmpty) {
newTaskTempFileAbsPath(taskContext, absoluteDir, spec.ext)
} else {
throw new UnsupportedOperationException(
s"${getClass.getSimpleName}.newTaskTempFileAbsPath does not support file name prefix: " +
s"${spec.prefix}")
}
}

/**
* Commits a task after the writes succeed. Must be called on the executors when running tasks.
*/
Expand Down Expand Up @@ -174,3 +233,12 @@ object FileCommitProtocol extends Logging {
new Path(path, ".spark-staging-" + jobId)
}
}

/**
* The specification for Spark output file name.
* This is used by [[FileCommitProtocol]] to create full path of file.
*
* @param prefix Prefix of file.
* @param ext Extension of file.
*/
final case class FileNameSpec(prefix: String, ext: String)

0 comments on commit 8cc4899

Please sign in to comment.