Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet read unsigned int: uint8, uin16, uint32 #3438

Merged
merged 24 commits into from
Sep 23, 2021
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -610,13 +610,14 @@ trait ParquetPartitionReaderBase extends Logging with Arm with ScanWithMetrics
inputTable: Table,
filePath: String,
clippedSchema: MessageType): Table = {

firestarman marked this conversation as resolved.
Show resolved Hide resolved
val precisions = getPrecisionsList(clippedSchema.asGroupType().getFields.asScala)
// check if there are cols with precision that can be stored in an int
val typeCastingNeeded = precisions.exists(p => p <= Decimal.MAX_INT_DIGITS)
if (readDataSchema.length > inputTable.getNumberOfColumns || typeCastingNeeded) {
val decimalCastingNeeded = precisions.exists(p => p <= Decimal.MAX_INT_DIGITS)
firestarman marked this conversation as resolved.
Show resolved Hide resolved
val unsignedCastingNeeded = existsUnsignedType(clippedSchema.asGroupType())
if (readDataSchema.length > inputTable.getNumberOfColumns
|| decimalCastingNeeded || unsignedCastingNeeded) {
// Spark+Parquet schema evolution is relatively simple with only adding/removing columns
// To type casting or anyting like that
// To type casting or anything like that
val clippedGroups = clippedSchema.asGroupType()
val newColumns = new Array[ColumnVector](readDataSchema.length)
try {
Expand All @@ -626,15 +627,28 @@ trait ParquetPartitionReaderBase extends Logging with Arm with ScanWithMetrics
val readField = readDataSchema(writeAt)
if (areNamesEquiv(clippedGroups, readAt, readField.name, isSchemaCaseSensitive)) {
val origCol = table.getColumn(readAt)
val col: ColumnVector = if (typeCastingNeeded) {
ColumnCastUtil.ifTrueThenDeepConvertTypeAtoTypeB(origCol, readField.dataType,
(dt, cv) => cv.getType.isDecimalType &&
!GpuColumnVector.getNonNestedRapidsType(dt).equals(cv.getType()),
(dt, cv) =>
cv.castTo(DecimalUtil.createCudfDecimal(dt.asInstanceOf[DecimalType])))

firestarman marked this conversation as resolved.
Show resolved Hide resolved
var col: ColumnVector = origCol
if (!(decimalCastingNeeded || unsignedCastingNeeded)) {
col = origCol.incRefCount()
} else {
origCol.incRefCount()
if (decimalCastingNeeded) {
col = ColumnCastUtil.ifTrueThenDeepConvertTypeAtoTypeB(
col, readField.dataType,
(dt, cv) => cv.getType.isDecimalType &&
!GpuColumnVector.getNonNestedRapidsType(dt).equals(cv.getType()),
(dt, cv) =>
cv.castTo(DecimalUtil.createCudfDecimal(dt.asInstanceOf[DecimalType])))
}
if (unsignedCastingNeeded) {
col = ColumnCastUtil.ifTrueThenDeepConvertTypeAtoTypeB(
firestarman marked this conversation as resolved.
Show resolved Hide resolved
col, readField.dataType,
(dt, cv) => needUnsignedToSignedCast(cv, dt),
(dt, cv) =>
cv.castTo(DType.create(GpuColumnVector.getNonNestedRapidsType(dt).getTypeId)))
}
}

firestarman marked this conversation as resolved.
Show resolved Hide resolved
newColumns(writeAt) = col
readAt += 1
} else {
Expand All @@ -656,6 +670,33 @@ trait ParquetPartitionReaderBase extends Logging with Arm with ScanWithMetrics
}
}

/**
* Need to convert cudf unsigned integer to wider signed integer that parquet expects
* After spark 3.2.0, parquet read uint8 as int16, uint16 as int32, uint32 as int64
res-life marked this conversation as resolved.
Show resolved Hide resolved
* TODO uint64 -> Decimal(20,0) depends CUDF, see issue #3475
*
* @param group the schema
* @return if has unsigned integer
*/
def existsUnsignedType(group: GroupType): Boolean = {
group.getFields.asScala.exists(
field => {
if (field.isPrimitive) {
val t = field.getOriginalType
(t == OriginalType.UINT_8) || (t == OriginalType.UINT_16) || (t == OriginalType.UINT_32)
} else {
existsUnsignedType(field.asGroupType)
}
}
)
}

def needUnsignedToSignedCast(cv: ColumnView, dataType: DataType): Boolean = {
firestarman marked this conversation as resolved.
Show resolved Hide resolved
(cv.getType.equals(DType.UINT8) && dataType.isInstanceOf[ShortType]) ||
firestarman marked this conversation as resolved.
Show resolved Hide resolved
(cv.getType.equals(DType.UINT16) && dataType.isInstanceOf[IntegerType]) ||
(cv.getType.equals(DType.UINT32) && dataType.isInstanceOf[LongType])
}

protected def readPartFile(
blocks: Seq[BlockMetaData],
clippedSchema: MessageType,
Expand Down Expand Up @@ -1362,3 +1403,4 @@ object ParquetPartitionReader {
})
}
}

Binary file added tests/src/test/resources/unsigned-int.parquet
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,10 @@ class ParquetScanSuite extends SparkQueryCompareTestSuite {
frameFromParquetWithSchema("disorder-read-schema.parquet", StructType(Seq(
StructField("c3_long", LongType),
StructField("c1_int", IntegerType))))) { frame => frame }

testSparkResultsAreEqual("Test Parquet unsigned int: uint8, uint16, uint32",
frameFromParquet("unsigned-int.parquet"),
firestarman marked this conversation as resolved.
Show resolved Hide resolved
assumeCondition = (_ => (isSpark320OrLater, "Spark version not 3.2.0+"))) {
res-life marked this conversation as resolved.
Show resolved Hide resolved
frame => frame.select(col("*"))
}
}