Skip to content

Commit

Permalink
Create Hdfs folder in gluten side
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf committed Oct 17, 2023
1 parent 91bfa58 commit 9113611
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ package org.apache.spark.sql.execution.datasources.v1

import io.glutenproject.execution.datasource.GlutenRowSplitter
import io.glutenproject.vectorized.CHColumnVector

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.orc.OrcUtils
import org.apache.spark.sql.types.StructType

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.mapreduce.TaskAttemptContext

Expand All @@ -34,7 +34,8 @@ trait CHFormatWriterInjects extends GlutenFormatWriterInjectsBase {
path: String,
dataSchema: StructType,
context: TaskAttemptContext,
nativeConf: java.util.Map[String, String]): OutputWriter = {
nativeConf: java.util.Map[String, String],
hadoopConfs: Configuration = null): OutputWriter = {
val originPath = path
val datasourceJniWrapper = new CHDatasourceJniWrapper();
val instance =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ import org.apache.spark.util.TaskResources

import com.google.common.base.Preconditions
import org.apache.arrow.c.ArrowSchema
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.mapreduce.TaskAttemptContext

import java.io.IOException
Expand All @@ -44,9 +45,19 @@ trait VeloxFormatWriterInjects extends GlutenFormatWriterInjectsBase {
path: String,
dataSchema: StructType,
context: TaskAttemptContext,
nativeConf: java.util.Map[String, String]): OutputWriter = {
nativeConf: java.util.Map[String, String],
hadoopConfs: Configuration): OutputWriter = {
val originPath = path

// Created the hdfs path if not existed.
if (path.substring(0, 5).equals("hdfs:")) {
val fs = FileSystem.get(hadoopConfs)
val hdfsPath = new Path(originPath)
if (!fs.exists(hdfsPath)) {
fs.mkdirs(hdfsPath)
}
}

val arrowSchema =
SparkArrowUtil.toArrowSchema(dataSchema, SQLConf.get.sessionLocalTimeZone)
val cSchema = ArrowSchema.allocateNew(ArrowBufferAllocators.contextInstance())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.execution.datasources.FakeRow
import org.apache.spark.sql.execution.datasources.OutputWriter
import org.apache.spark.sql.types.StructType

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.mapreduce.TaskAttemptContext

Expand All @@ -33,7 +34,8 @@ trait GlutenFormatWriterInjects {
path: String,
dataSchema: StructType,
context: TaskAttemptContext,
nativeConf: java.util.Map[String, String]): OutputWriter
nativeConf: java.util.Map[String, String],
hadoopConfs: Configuration = null): OutputWriter

def inferSchema(
sparkSession: SparkSession,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.util.{SerializableConfiguration, Utils}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.FileSplit
Expand Down Expand Up @@ -97,6 +98,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {

val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf)

val conf = job.getConfiguration
Expand Down Expand Up @@ -132,7 +134,12 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
context: TaskAttemptContext): OutputWriter = {
GlutenOrcWriterInjects
.getInstance()
.createOutputWriter(path, dataSchema, context, nativeConf);
.createOutputWriter(
path,
dataSchema,
context,
nativeConf,
sparkSession.sparkContext.hadoopConfiguration);

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,12 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging
context: TaskAttemptContext): OutputWriter = {
GlutenParquetWriterInjects
.getInstance()
.createOutputWriter(path, dataSchema, context, nativeConf);
.createOutputWriter(
path,
dataSchema,
context,
nativeConf,
sparkSession.sparkContext.hadoopConfiguration);

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,21 @@ class HiveFileFormat(fileSinkConf: FileSinkDesc)
if (isParquetFormat) {
GlutenParquetWriterInjects
.getInstance()
.createOutputWriter(path, dataSchema, context, nativeConf);
.createOutputWriter(
path,
dataSchema,
context,
nativeConf,
sparkSession.sparkContext.hadoopConfiguration);
} else {
GlutenOrcWriterInjects
.getInstance()
.createOutputWriter(path, dataSchema, context, nativeConf);
.createOutputWriter(
path,
dataSchema,
context,
nativeConf,
sparkSession.sparkContext.hadoopConfiguration);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,12 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
context: TaskAttemptContext): OutputWriter = {
GlutenOrcWriterInjects
.getInstance()
.createOutputWriter(path, dataSchema, context, nativeConf);
.createOutputWriter(
path,
dataSchema,
context,
nativeConf,
sparkSession.sparkContext.hadoopConfiguration);

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,12 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging
context: TaskAttemptContext): OutputWriter = {
GlutenParquetWriterInjects
.getInstance()
.createOutputWriter(path, dataSchema, context, nativeConf);
.createOutputWriter(
path,
dataSchema,
context,
nativeConf,
sparkSession.sparkContext.hadoopConfiguration);

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,21 @@ class HiveFileFormat(fileSinkConf: FileSinkDesc)
if (isParquetFormat) {
GlutenParquetWriterInjects
.getInstance()
.createOutputWriter(path, dataSchema, context, nativeConf);
.createOutputWriter(
path,
dataSchema,
context,
nativeConf,
sparkSession.sparkContext.hadoopConfiguration);
} else {
GlutenOrcWriterInjects
.getInstance()
.createOutputWriter(path, dataSchema, context, nativeConf);
.createOutputWriter(
path,
dataSchema,
context,
nativeConf,
sparkSession.sparkContext.hadoopConfiguration);
}
}
}
Expand Down

0 comments on commit 9113611

Please sign in to comment.