Skip to content

Commit

Permalink
Fix issue with CSV
Browse files Browse the repository at this point in the history
  • Loading branch information
revans2 committed Mar 7, 2024
1 parent 98571d4 commit d9c82b1
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -419,20 +419,21 @@ class CSVPartitionReader(
* Read the host buffer to GPU table
*
* @param dataBufferer buffered data to be parsed
* @param cudfSchema the cudf schema of the data
* @param cudfDataSchema the cudf schema of the data
* @param readDataSchema the Spark schema describing what will be read
* @param isFirstChunk if it is the first chunk
* @return table
*/
override def readToTable(
dataBufferer: HostLineBufferer,
cudfSchema: Schema,
cudfDataSchema: Schema,
readDataSchema: StructType,
cudfReadDataSchema: Schema,
isFirstChunk: Boolean,
decodeTime: GpuMetric): Table = {
val hasHeader = isFirstChunk && parsedOptions.headerFlag
val csvOpts = buildCsvOptions(parsedOptions, readDataSchema, hasHeader)
CSVPartitionReader.readToTable(dataBufferer, cudfSchema, decodeTime, csvOpts,
CSVPartitionReader.readToTable(dataBufferer, cudfDataSchema, decodeTime, csvOpts,
getFileFormatShortName, partFile)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -478,14 +478,15 @@ abstract class GpuTextBasedPartitionReader[BUFF <: LineBufferer, FACT <: LineBuf
readDataSchema
}

val cudfSchema = getCudfSchema(newReadDataSchema)
val cudfSchema = getCudfSchema(dataSchema)
val cudfReadSchema = getCudfSchema(newReadDataSchema)

// about to start using the GPU
GpuSemaphore.acquireIfNecessary(TaskContext.get())

// The buffer that is sent down
val table = readToTable(dataBuffer, cudfSchema, newReadDataSchema, isFirstChunk,
metrics(GPU_DECODE_TIME))
val table = readToTable(dataBuffer, cudfSchema, newReadDataSchema, cudfReadSchema,
isFirstChunk, metrics(GPU_DECODE_TIME))

// parse boolean and numeric columns that were read as strings
val castTable = withResource(table) { _ =>
Expand Down Expand Up @@ -552,15 +553,17 @@ abstract class GpuTextBasedPartitionReader[BUFF <: LineBufferer, FACT <: LineBuf
/**
* Read the host buffer to GPU table
* @param dataBuffer where the data is buffered
* @param cudfSchema the cudf schema of the data
* @param cudfDataSchema the cudf schema of the data
* @param readDataSchema the Spark schema describing what will be read
* @param cudfReadDataSchema the cudf schema of just the data we want to read.
* @param isFirstChunk if it is the first chunk
* @return table
*/
def readToTable(
dataBuffer: BUFF,
cudfSchema: Schema,
cudfDataSchema: Schema,
readDataSchema: StructType,
cudfReadDataSchema: Schema,
isFirstChunk: Boolean,
decodeTime: GpuMetric): Table

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,22 +360,23 @@ class JsonPartitionReader(
*
* @param dataBuffer host buffer to be read
* @param dataSize the size of host buffer
* @param cudfSchema the cudf schema of the data
* @param cudfDataSchema the cudf schema of the data
* @param readDataSchema the Spark schema describing what will be read
* @param hasHeader if it has header
* @return table
*/
override def readToTable(
dataBufferer: HostLineBufferer,
cudfSchema: Schema,
cudfDataSchema: Schema,
readDataSchema: StructType,
cudfReadDataSchema: Schema,
hasHeader: Boolean,
decodeTime: GpuMetric): Table = {
val jsonOpts = buildJsonOptions(parsedOptions)
val jsonTbl = JsonPartitionReader.readToTable(dataBufferer, cudfSchema, decodeTime, jsonOpts,
getFileFormatShortName, partFile)
val jsonTbl = JsonPartitionReader.readToTable(dataBufferer, cudfReadDataSchema, decodeTime,
jsonOpts, getFileFormatShortName, partFile)
withResource(jsonTbl) { tbl =>
val cudfColumnNames = cudfSchema.getColumnNames
val cudfColumnNames = cudfReadDataSchema.getColumnNames
val columns = readDataSchema.map { field =>
val i = cudfColumnNames.indexOf(field.name)
if (i == -1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,8 +486,9 @@ class GpuHiveDelimitedTextPartitionReader(conf: Configuration,
maxBytesPerChunk, execMetrics, HostStringColBuffererFactory) {

override def readToTable(dataBufferer: HostStringColBufferer,
inputFileCudfSchema: Schema,
cudfDataSchema: Schema,
requestedOutputDataSchema: StructType,
cudfReadDataSchema: Schema,
isFirstChunk: Boolean,
decodeTime: GpuMetric): Table = {
withResource(new NvtxWithMetrics(getFileFormatShortName + " decode",
Expand Down Expand Up @@ -515,7 +516,7 @@ class GpuHiveDelimitedTextPartitionReader(conf: Configuration,
withResource(Scalar.fromNull(DType.STRING)) { nullVal =>
// This is a bit different because we are dropping columns/etc ourselves
val requiredColumnSequence = requestedOutputDataSchema.map(_.name).toList
val outputColumnNames = inputFileCudfSchema.getColumnNames
val outputColumnNames = cudfDataSchema.getColumnNames
val reorderedColumns = requiredColumnSequence.safeMap { colName =>
val colIndex = outputColumnNames.indexOf(colName)
if (splitTable.getNumberOfColumns > colIndex) {
Expand Down

0 comments on commit d9c82b1

Please sign in to comment.