diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/ColumnCastUtil.scala b/sql-plugin/src/main/java/com/nvidia/spark/rapids/ColumnCastUtil.scala index 927b457f180..f2734f509f9 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/ColumnCastUtil.scala +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/ColumnCastUtil.scala @@ -22,7 +22,7 @@ import scala.collection.mutable.{ArrayBuffer, ArrayBuilder} import ai.rapids.cudf.{ColumnVector, ColumnView, DType} -import org.apache.spark.sql.types.{ArrayType, DataType, StructType} +import org.apache.spark.sql.types.{ArrayType, DataType, StructType, MapType, StructField} /** * This class casts a column to another column if the predicate passed resolves to true. @@ -191,6 +191,26 @@ object ColumnCastUtil extends Arm { } else { cv } + case m: MapType => + // map is list of structure + val struct = cv.getChildColumnView(0) + toClose += struct + + if(cv.getType != DType.LIST || struct.getType != DType.STRUCT) { + throw new IllegalStateException("Map should be List(Structure) in column view") + } + + val newChild = convertTypeAToTypeB(struct, + StructType(Array(StructField("", m.keyType), StructField("", m.valueType))), + predicate, toClose) + + if (struct == newChild) { + cv + } else { + val newView = cv.replaceListChild(newChild) + toClose += newView + newView + } case _ => if (predicate(dataType, cv)) { val col = convert(dataType, cv) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala index 54cbd2a581c..8cc0f334a29 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala @@ -613,10 +613,12 @@ trait ParquetPartitionReaderBase extends Logging with Arm with ScanWithMetrics val precisions = getPrecisionsList(clippedSchema.asGroupType().getFields.asScala) // check if there are cols with precision that can be stored in an int - val typeCastingNeeded = precisions.exists(p => p <= Decimal.MAX_INT_DIGITS) - if (readDataSchema.length > inputTable.getNumberOfColumns || typeCastingNeeded) { + val hasDecimalAsInt = precisions.exists(p => p <= Decimal.MAX_INT_DIGITS) + val hasUnsignedType = existsUnsignedType(clippedSchema.asGroupType()) + if (readDataSchema.length > inputTable.getNumberOfColumns + || hasDecimalAsInt || hasUnsignedType) { // Spark+Parquet schema evolution is relatively simple with only adding/removing columns - // To type casting or anyting like that + // To type casting or anything like that val clippedGroups = clippedSchema.asGroupType() val newColumns = new Array[ColumnVector](readDataSchema.length) try { @@ -626,12 +628,10 @@ trait ParquetPartitionReaderBase extends Logging with Arm with ScanWithMetrics val readField = readDataSchema(writeAt) if (areNamesEquiv(clippedGroups, readAt, readField.name, isSchemaCaseSensitive)) { val origCol = table.getColumn(readAt) - val col: ColumnVector = if (typeCastingNeeded) { + val col: ColumnVector = if (hasDecimalAsInt || hasUnsignedType) { ColumnCastUtil.ifTrueThenDeepConvertTypeAtoTypeB(origCol, readField.dataType, - (dt, cv) => cv.getType.isDecimalType && - !GpuColumnVector.getNonNestedRapidsType(dt).equals(cv.getType()), - (dt, cv) => - cv.castTo(DecimalUtil.createCudfDecimal(dt.asInstanceOf[DecimalType]))) + (dt, cv) => needDecimalCast(cv, dt) || needUnsignedToSignedCast(cv, dt), + (dt, cv) => decimalCastOrUnsignedCast(cv, dt)) } else { origCol.incRefCount() } @@ -656,6 +656,54 @@ trait ParquetPartitionReaderBase extends Logging with Arm with ScanWithMetrics } } + /** + * Need to convert cudf unsigned integer to wider signed integer that Spark expects + * After Spark 3.2.0, Spark reads uint8 as int16, uint16 as int32, uint32 as int64 + * TODO uint64 -> Decimal(20,0) depends CUDF, see issue #3475 + * + * @param group the schema + * @return if has unsigned integer + */ + def existsUnsignedType(group: GroupType): Boolean = { + group.getFields.asScala.exists( + field => { + if (field.isPrimitive) { + val t = field.getOriginalType + (t == OriginalType.UINT_8) || (t == OriginalType.UINT_16) || (t == OriginalType.UINT_32) + } else { + existsUnsignedType(field.asGroupType) + } + } + ) + } + + def needDecimalCast(cv: ColumnView, dt: DataType): Boolean = { + cv.getType.isDecimalType && !GpuColumnVector.getNonNestedRapidsType(dt).equals(cv.getType()) + } + + def needUnsignedToSignedCast(cv: ColumnView, dt: DataType): Boolean = { + (cv.getType.equals(DType.UINT8) && dt.isInstanceOf[ShortType]) || + (cv.getType.equals(DType.UINT16) && dt.isInstanceOf[IntegerType]) || + (cv.getType.equals(DType.UINT32) && dt.isInstanceOf[LongType]) + } + + // Will do cast if needDecimalCast or needUnsignedToSignedCast test is true + // in ColumnCastUtil.ifTrueThenDeepConvertTypeAtoTypeB. + // Note: The behavior of unsigned to signed is decided by the Spark, + // this means the parameter dt is from Spark meta module. + // This implements the requested type behavior accordingly for GPU. + // This is suitable for all Spark versions, no need to add to shim layer. + private def decimalCastOrUnsignedCast(cv: ColumnView, dt: DataType): ColumnView = { + if (needDecimalCast(cv, dt)) { + cv.castTo(DecimalUtil.createCudfDecimal(dt.asInstanceOf[DecimalType])) + } else if (needUnsignedToSignedCast(cv, dt)) { + cv.castTo(DType.create(GpuColumnVector.getNonNestedRapidsType(dt).getTypeId)) + } else { + throw new IllegalStateException("Logical error: should only be " + + "decimal cast or unsigned to signed cast") + } + } + protected def readPartFile( blocks: Seq[BlockMetaData], clippedSchema: MessageType, @@ -1362,3 +1410,4 @@ object ParquetPartitionReader { }) } } + diff --git a/tests/src/test/resources/unsigned-int.parquet b/tests/src/test/resources/unsigned-int.parquet new file mode 100644 index 00000000000..62fbcc9f798 Binary files /dev/null and b/tests/src/test/resources/unsigned-int.parquet differ diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala index d175f7ad5eb..b4b89059a23 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala @@ -90,4 +90,56 @@ class ParquetScanSuite extends SparkQueryCompareTestSuite { frameFromParquetWithSchema("disorder-read-schema.parquet", StructType(Seq( StructField("c3_long", LongType), StructField("c1_int", IntegerType))))) { frame => frame } + + /** + * Column schema of unsigned-int.parquet is: + * TODO: array nest type is blocked by issue: https://github.com/rapidsai/cudf/issues/9240 + * + * message root { + * required int32 a (UINT_8); + * required int32 b (UINT_16); + * required int32 c (UINT_32); + * required group g { + * required int32 c1 (UINT_8); + * required int32 c2 (UINT_16); + * required int32 c3 (UINT_32); + * } + * required group m1 (MAP) { + * repeated group key_value { + * required int32 key (UINT_8); + * optional int32 value (UINT_8); + * } + * } + * required group m2 (MAP) { + * repeated group key_value { + * required int32 key (UINT_16); + * optional int32 value (UINT_16); + * } + * } + * required group m3 (MAP) { + * repeated group key_value { + * required int32 key (UINT_32); + * optional int32 value (UINT_32); + * } + * } + * optional group m4 (MAP) { + * repeated group key_value { + * required int32 key (UINT_32); + * required group value { + * required int32 c1 (UINT_8); + * required int32 c2 (UINT_16); + * required int32 c3 (UINT_32); + * } + * } + * } + * } + * + */ + testSparkResultsAreEqual("Test Parquet unsigned int: uint8, uint16, uint32", + frameFromParquet("unsigned-int.parquet"), + // CPU version throws an exception when Spark < 3.2, so skip when Spark < 3.2. + // The exception is like "Parquet type not supported: INT32 (UINT_8)" + assumeCondition = (_ => (VersionUtils.isSpark320OrLater, "Spark version not 3.2.0+"))) { + frame => frame.select(col("*")) + } }