Skip to content

Commit

Permalink
Create Hdfs tmp path in gluten side when writing hdfs file
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf committed Oct 18, 2023
1 parent 353c45a commit c24aefc
Showing 1 changed file with 14 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.util.TaskResources

import com.google.common.base.Preconditions
import org.apache.arrow.c.ArrowSchema
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.mapreduce.TaskAttemptContext

import java.io.IOException
Expand All @@ -45,7 +45,16 @@ trait VeloxFormatWriterInjects extends GlutenFormatWriterInjectsBase {
dataSchema: StructType,
context: TaskAttemptContext,
nativeConf: java.util.Map[String, String]): OutputWriter = {
val originPath = path

// Create the hdfs path if not existed.
val hdfsSchema = "hdfs://"
if (path.startsWith(hdfsSchema)) {
val fs = FileSystem.get(context.getConfiguration)
val hdfsPath = new Path(path)
if (!fs.exists(hdfsPath)) {
fs.mkdirs(hdfsPath)
}
}

val arrowSchema =
SparkArrowUtil.toArrowSchema(dataSchema, SQLConf.get.sessionLocalTimeZone)
Expand All @@ -56,7 +65,7 @@ trait VeloxFormatWriterInjects extends GlutenFormatWriterInjectsBase {
try {
ArrowAbiUtil.exportSchema(allocator, arrowSchema, cSchema)
dsHandle = datasourceJniWrapper.nativeInitDatasource(
originPath,
path,
cSchema.memoryAddress(),
NativeMemoryManagers.contextInstance("VeloxWriter").getNativeInstanceHandle,
nativeConf)
Expand All @@ -74,7 +83,7 @@ trait VeloxFormatWriterInjects extends GlutenFormatWriterInjectsBase {
arrowSchema,
allocator,
datasourceJniWrapper,
originPath)
path)

new OutputWriter {
override def write(row: InternalRow): Unit = {
Expand All @@ -91,7 +100,7 @@ trait VeloxFormatWriterInjects extends GlutenFormatWriterInjectsBase {

// Do NOT add override keyword for compatibility on spark 3.1.
def path(): String = {
originPath
path
}
}
}
Expand Down

0 comments on commit c24aefc

Please sign in to comment.