From b7ff86f2c1775fab4ee7b0b9956708736860df59 Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Fri, 10 Sep 2021 12:01:24 +0800 Subject: [PATCH 01/19] parquet read unsigned int: uint8, uin16, uint32 Signed-off-by: Chong Gao --- .../nvidia/spark/rapids/GpuParquetScan.scala | 72 +++++++++++++++++- tests/src/test/resources/unsigned-int.parquet | Bin 0 -> 6533 bytes .../spark/rapids/ParquetScanSuite.scala | 5 ++ 3 files changed, 76 insertions(+), 1 deletion(-) create mode 100644 tests/src/test/resources/unsigned-int.parquet diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index 54cbd2a581c..7a9d4adb8a6 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -610,13 +610,16 @@ trait ParquetPartitionReaderBase extends Logging with Arm with ScanWithMetrics inputTable: Table, filePath: String, clippedSchema: MessageType): Table = { + convertToUnsigned(convertToDecimal(inputTable, filePath, clippedSchema)) + } + private def convertToDecimal(inputTable: Table, filePath: String, clippedSchema: MessageType) = { val precisions = getPrecisionsList(clippedSchema.asGroupType().getFields.asScala) // check if there are cols with precision that can be stored in an int val typeCastingNeeded = precisions.exists(p => p <= Decimal.MAX_INT_DIGITS) if (readDataSchema.length > inputTable.getNumberOfColumns || typeCastingNeeded) { // Spark+Parquet schema evolution is relatively simple with only adding/removing columns - // To type casting or anyting like that + // To type casting or anything like that val clippedGroups = clippedSchema.asGroupType() val newColumns = new Array[ColumnVector](readDataSchema.length) try { @@ -656,6 +659,73 @@ trait ParquetPartitionReaderBase extends Logging with Arm with ScanWithMetrics } } + /** + * Convert cudf unsigned integer to wider signed integer that parquet expects + * After spark 3.2.0, parquet read uint8 as int16, uint16 as int32, uint32 as int64 + * + * @param inputTable + * @return + */ + private def convertToUnsigned(inputTable: Table) = { + var typeCastingNeeded = needUnsignedCast(inputTable, readDataSchema) + if (typeCastingNeeded) { + val newColumns = new Array[ColumnVector](readDataSchema.length) + try { + withResource(inputTable) { table => + (0 until readDataSchema.length).foreach(i => { + val readField = readDataSchema(i) + val origCol = table.getColumn(i) + val col: ColumnVector = + ColumnCastUtil.ifTrueThenDeepConvertTypeAtoTypeB(origCol, readField.dataType, + (dt, cv) => needUnsignedToSignedCastFunction(cv, dt), + (dt, cv) => cv.castTo( + DType.create(GpuColumnVector.getNonNestedRapidsType(dt).getTypeId))) + newColumns(i) = col + }) + } + new Table(newColumns: _*) + } finally { + newColumns.safeClose() + } + } else { + inputTable + } + } + + def needUnsignedCast(table: Table, schema: StructType): Boolean = { + var b = false + for (i <- 0 until table.getNumberOfColumns) { + if (needUnsignedToSignedCast(table.getColumn(i), schema.fields(i).dataType)) { + b =true + } + } + b + } + + def needUnsignedToSignedCast(cv: ColumnView, dataType: DataType): Boolean = { + dataType match { + case a: ArrayType => + val child = cv.getChildColumnView(0) + return needUnsignedToSignedCast(child, a.elementType) + case s: StructType => + for( i <- 0 until cv.getNumChildren) { + val child = cv.getChildColumnView(i) + if (needUnsignedToSignedCast(child, s(i).dataType)) { + return true + } + } + case _ => + return needUnsignedToSignedCastFunction(cv, dataType) + } + return false + } + + def needUnsignedToSignedCastFunction(cv: ColumnView, dataType: DataType): Boolean = { + (cv.getType.equals(DType.UINT8) && dataType.isInstanceOf[ShortType]) || + (cv.getType.equals(DType.UINT16) && dataType.isInstanceOf[IntegerType]) || + (cv.getType.equals(DType.UINT32) && dataType.isInstanceOf[LongType]) + } + protected def readPartFile( blocks: Seq[BlockMetaData], clippedSchema: MessageType, diff --git a/tests/src/test/resources/unsigned-int.parquet b/tests/src/test/resources/unsigned-int.parquet new file mode 100644 index 0000000000000000000000000000000000000000..978e8b8ef851e822a1767f6e54ee4517be184c7f GIT binary patch literal 6533 zcmeI%^^;Il9|z!-TvQ}vmt8u^`1JTN5|DzFJid~eG^8bvbUevZJWYC@Ap;r7L}s2P3qd@-c~(LQB^%kv z!E-!MPQu7VZt{?qaPpC#0u-bWFHo2w6r~u&iJ%0LL{X9#DMe|@P?mC(rvfigkxEpi z3RS5_bzY_hHK|2y>QI+@)TaRrd4)zarU|dolxDof>%2j8TF{bKyh&@Ki6NFa;z^(l zZ_$>wX-9iH(2-7brVEL5r5oMpK~H+on?CfVAN?7?K#~~5V1_W1VGL&kBN@eL#xRy~ zyu)}VFp){T%X>^_3R9WJbY?J|B1oOP^c0~^`IX1?G{wy>3LY-a~M*~M=5u$O)8=PSPE8xC-gZ#l$aj&PLk zIL2{K@I60pl2e@K3}-pVc`k5~OZ>=B{LC->%5Ple3cvFQSGmS@Zg7)Z{K;+ZaF=`B z=K&9S#N(I$2~v=fRHP;iX$d49Px2H`lb&bDKt?i=nPYEE{1zGMqq*~WHuu#;WvW)FMW$9}%zYrf$C2lqc$9*2~kVhnE zgyf9S%QVoNKJ=v@{TaYOk{CpCMo7*GDSb0ik(xB5C6IJH$x}Q{dY&Nz8OcOuo+S%G z1e28zLdiyUa_}6_lanxVk()f^C7gWZrvL>h#0wOr2t_GIaUv)|BvF*)MM_bcGL)qp z<*C3+RHPD>sX|q%QJt5mK}~8=n>y5`9`$KJLtdc~jcLNGG^H7@@j7qNoEEgC6>rj- zXkv&Zj(8Gi!&|iFZQ9YE4s@gwo#{d%UFk-5deD6{zw#TGxx(-K!Bwttog3Wb z7JqV^JKW_S_j$lW9+8|8k~2bbMo3;qNM1)s&Iouwq>ezo)Ppkaz>fmG1o*hu2R_Ex%F5Tn173q)| z)+0W#Yuip8!y<~7E*cS5Af|iU_HkiR(J|3canbRmVq%KNMa9R(#3qy~Qz9}tJ|Z%* ZOhSp$32~7Lr4nKj3jX)!r=bm){{UFmLnHtI literal 0 HcmV?d00001 diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala index d175f7ad5eb..41e33261b20 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala @@ -90,4 +90,9 @@ class ParquetScanSuite extends SparkQueryCompareTestSuite { frameFromParquetWithSchema("disorder-read-schema.parquet", StructType(Seq( StructField("c3_long", LongType), StructField("c1_int", IntegerType))))) { frame => frame } + + testSparkResultsAreEqual("Test Parquet unsigned int: uint8, uint16, uint32", + frameFromParquet("unsigned-int.parquet")) { + frame => frame.select(col("*")) + } } From 3ead7cbc535ea2d1cc0fbb78bb377d53e28adf18 Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Fri, 10 Sep 2021 15:29:41 +0800 Subject: [PATCH 02/19] parquet read unsigned int: uint8, uin16, uint32 Signed-off-by: Chong Gao --- tests/src/test/resources/unsigned-int.parquet | Bin 6533 -> 6040 bytes 1 file changed, 0 insertions(+), 0 deletions(-) diff --git a/tests/src/test/resources/unsigned-int.parquet b/tests/src/test/resources/unsigned-int.parquet index 978e8b8ef851e822a1767f6e54ee4517be184c7f..e2fd6f92b5fd703fe1e44adac1cd2aeefe141cf4 100644 GIT binary patch delta 695 zcmZoQo}n)k;22~m$|Bkz07O2b4J@K8oAU&iSn5R?MB6xkNJq4R11QfS${{1c@}Gf$ zL4pNHFo=N&j(P@0CT12^Hg*n9E^Z!PK7Ii~Az=|wF>wh=DQOv5Ie7&|C1n*=HFXV5 zEo~iLJ$(a1BV!X&Gjj_|D{C8DJ9`I5CubK|H+K(DFK-`TKmUNhpx}_uu<(fb$f)R; z*tqzF#H8eu)U@=B%&hF3+`RmP!lL4m(z5c3%Bt#`+PeCN#-`?$*0%PJ&aUpB-oE|` z6DLicGIiSY88c_io-=pe`~?daEnc#8+42=DSFK*NcHR078#isa!-+vl~Kz(?`fQEqfwd;gJfEgA9n|%aSIX3$W#BgYE zut-WUCW($4jYVQqxMZ=@JqN`>F+HfOrvV(X800!S3g#Z8m From c991312375de5b32b08b658e87f5ae680e53e798 Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Mon, 13 Sep 2021 19:17:53 +0800 Subject: [PATCH 03/19] refactor Signed-off-by: Chong Gao --- .../nvidia/spark/rapids/GpuParquetScan.scala | 36 ++++++++----------- 1 file changed, 14 insertions(+), 22 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index 7a9d4adb8a6..02a0a95c30d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -610,10 +610,11 @@ trait ParquetPartitionReaderBase extends Logging with Arm with ScanWithMetrics inputTable: Table, filePath: String, clippedSchema: MessageType): Table = { - convertToUnsigned(convertToDecimal(inputTable, filePath, clippedSchema)) + convertToUnsigned(evolveSchemaIfNeededAndCloseImp(inputTable, filePath, clippedSchema)) } - private def convertToDecimal(inputTable: Table, filePath: String, clippedSchema: MessageType) = { + private def evolveSchemaIfNeededAndCloseImp( + inputTable: Table, filePath: String, clippedSchema: MessageType) = { val precisions = getPrecisionsList(clippedSchema.asGroupType().getFields.asScala) // check if there are cols with precision that can be stored in an int val typeCastingNeeded = precisions.exists(p => p <= Decimal.MAX_INT_DIGITS) @@ -663,11 +664,11 @@ trait ParquetPartitionReaderBase extends Logging with Arm with ScanWithMetrics * Convert cudf unsigned integer to wider signed integer that parquet expects * After spark 3.2.0, parquet read uint8 as int16, uint16 as int32, uint32 as int64 * - * @param inputTable - * @return + * @param inputTable the input table + * @return converted table if the input table has unsigned integer */ private def convertToUnsigned(inputTable: Table) = { - var typeCastingNeeded = needUnsignedCast(inputTable, readDataSchema) + val typeCastingNeeded = needUnsignedCast(inputTable, readDataSchema) if (typeCastingNeeded) { val newColumns = new Array[ColumnVector](readDataSchema.length) try { @@ -692,32 +693,23 @@ trait ParquetPartitionReaderBase extends Logging with Arm with ScanWithMetrics } } - def needUnsignedCast(table: Table, schema: StructType): Boolean = { - var b = false - for (i <- 0 until table.getNumberOfColumns) { - if (needUnsignedToSignedCast(table.getColumn(i), schema.fields(i).dataType)) { - b =true - } - } - b + def needUnsignedCast(table: Table, schema: StructType): Boolean = { + (0 until table.getNumberOfColumns).exists(i => + needUnsignedToSignedCast(table.getColumn(i), schema.fields(i).dataType)) } def needUnsignedToSignedCast(cv: ColumnView, dataType: DataType): Boolean = { dataType match { case a: ArrayType => val child = cv.getChildColumnView(0) - return needUnsignedToSignedCast(child, a.elementType) + needUnsignedToSignedCast(child, a.elementType) case s: StructType => - for( i <- 0 until cv.getNumChildren) { - val child = cv.getChildColumnView(i) - if (needUnsignedToSignedCast(child, s(i).dataType)) { - return true - } - } + (0 until cv.getNumChildren).exists(i => + needUnsignedToSignedCast(cv.getChildColumnView(i), s(i).dataType) + ) case _ => - return needUnsignedToSignedCastFunction(cv, dataType) + needUnsignedToSignedCastFunction(cv, dataType) } - return false } def needUnsignedToSignedCastFunction(cv: ColumnView, dataType: DataType): Boolean = { From 1e0ecb12db833cea714c3be6eccc464a8c7c53f3 Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Mon, 13 Sep 2021 19:45:41 +0800 Subject: [PATCH 04/19] skip test case when version is not ok --- .../test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala index 41e33261b20..f18c9f719b5 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala @@ -93,6 +93,7 @@ class ParquetScanSuite extends SparkQueryCompareTestSuite { testSparkResultsAreEqual("Test Parquet unsigned int: uint8, uint16, uint32", frameFromParquet("unsigned-int.parquet")) { + assumeSpark320orLater frame => frame.select(col("*")) } } From 26f3584b45ee8bca2ac0080d39591ac563b6f3e5 Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Tue, 14 Sep 2021 12:10:47 +0800 Subject: [PATCH 05/19] update test case Signed-off-by: Chong Gao --- .../scala/com/nvidia/spark/rapids/ParquetScanSuite.scala | 4 ++-- .../nvidia/spark/rapids/SparkQueryCompareTestSuite.scala | 6 +++++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala index f18c9f719b5..197eff370d0 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala @@ -92,8 +92,8 @@ class ParquetScanSuite extends SparkQueryCompareTestSuite { StructField("c1_int", IntegerType))))) { frame => frame } testSparkResultsAreEqual("Test Parquet unsigned int: uint8, uint16, uint32", - frameFromParquet("unsigned-int.parquet")) { - assumeSpark320orLater + frameFromParquet("unsigned-int.parquet"), + assumeCondition = (_ => (spark320orLater, "Spark version not 3.2.0+"))) { frame => frame.select(col("*")) } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala index e3e97439440..239f7b6185a 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala @@ -1846,7 +1846,11 @@ trait SparkQueryCompareTestSuite extends FunSuite with Arm { assume(cmpSparkVersion(3, 2, 0) < 0, "Spark version not before 3.2.0") def assumeSpark320orLater = - assume(cmpSparkVersion(3, 2, 0) >= 0, "Spark version not 3.2.0+") + assume(spark320orLater, "Spark version not 3.2.0+") + + def spark320orLater: Boolean = { + cmpSparkVersion(3, 2, 0) >= 0 + } def cmpSparkVersion(major: Int, minor: Int, bugfix: Int): Int = { val sparkShimVersion = ShimLoader.getSparkShims.getSparkShimVersion From 48e824a6ff7b695ec143aa53c20c4121837e81e9 Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Tue, 14 Sep 2021 18:09:03 +0800 Subject: [PATCH 06/19] refactor code Signed-off-by: Chong Gao --- .../nvidia/spark/rapids/GpuParquetScan.scala | 91 ++++++++----------- 1 file changed, 37 insertions(+), 54 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index 02a0a95c30d..7dc5a4a7970 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -610,15 +610,12 @@ trait ParquetPartitionReaderBase extends Logging with Arm with ScanWithMetrics inputTable: Table, filePath: String, clippedSchema: MessageType): Table = { - convertToUnsigned(evolveSchemaIfNeededAndCloseImp(inputTable, filePath, clippedSchema)) - } - - private def evolveSchemaIfNeededAndCloseImp( - inputTable: Table, filePath: String, clippedSchema: MessageType) = { val precisions = getPrecisionsList(clippedSchema.asGroupType().getFields.asScala) // check if there are cols with precision that can be stored in an int - val typeCastingNeeded = precisions.exists(p => p <= Decimal.MAX_INT_DIGITS) - if (readDataSchema.length > inputTable.getNumberOfColumns || typeCastingNeeded) { + val decimalCastingNeeded = precisions.exists(p => p <= Decimal.MAX_INT_DIGITS) + val unsignedCastingNeeded = existsUnsignedType(clippedSchema.asGroupType()) + if (readDataSchema.length > inputTable.getNumberOfColumns + || decimalCastingNeeded || unsignedCastingNeeded) { // Spark+Parquet schema evolution is relatively simple with only adding/removing columns // To type casting or anything like that val clippedGroups = clippedSchema.asGroupType() @@ -630,15 +627,28 @@ trait ParquetPartitionReaderBase extends Logging with Arm with ScanWithMetrics val readField = readDataSchema(writeAt) if (areNamesEquiv(clippedGroups, readAt, readField.name, isSchemaCaseSensitive)) { val origCol = table.getColumn(readAt) - val col: ColumnVector = if (typeCastingNeeded) { - ColumnCastUtil.ifTrueThenDeepConvertTypeAtoTypeB(origCol, readField.dataType, - (dt, cv) => cv.getType.isDecimalType && - !GpuColumnVector.getNonNestedRapidsType(dt).equals(cv.getType()), - (dt, cv) => - cv.castTo(DecimalUtil.createCudfDecimal(dt.asInstanceOf[DecimalType]))) + + var col: ColumnVector = origCol + if (!(decimalCastingNeeded || unsignedCastingNeeded)) { + col = origCol.incRefCount() } else { - origCol.incRefCount() + if (decimalCastingNeeded) { + col = ColumnCastUtil.ifTrueThenDeepConvertTypeAtoTypeB( + col, readField.dataType, + (dt, cv) => cv.getType.isDecimalType && + !GpuColumnVector.getNonNestedRapidsType(dt).equals(cv.getType()), + (dt, cv) => + cv.castTo(DecimalUtil.createCudfDecimal(dt.asInstanceOf[DecimalType]))) + } + if (unsignedCastingNeeded) { + col = ColumnCastUtil.ifTrueThenDeepConvertTypeAtoTypeB( + col, readField.dataType, + (dt, cv) => needUnsignedToSignedCast(cv, dt), + (dt, cv) => + cv.castTo(DType.create(GpuColumnVector.getNonNestedRapidsType(dt).getTypeId))) + } } + newColumns(writeAt) = col readAt += 1 } else { @@ -663,56 +673,28 @@ trait ParquetPartitionReaderBase extends Logging with Arm with ScanWithMetrics /** * Convert cudf unsigned integer to wider signed integer that parquet expects * After spark 3.2.0, parquet read uint8 as int16, uint16 as int32, uint32 as int64 + * TODO uint64 -> Decimal(20,0) depends CUDF, see issue #3475 * - * @param inputTable the input table + * @param group the input table * @return converted table if the input table has unsigned integer */ - private def convertToUnsigned(inputTable: Table) = { - val typeCastingNeeded = needUnsignedCast(inputTable, readDataSchema) - if (typeCastingNeeded) { - val newColumns = new Array[ColumnVector](readDataSchema.length) - try { - withResource(inputTable) { table => - (0 until readDataSchema.length).foreach(i => { - val readField = readDataSchema(i) - val origCol = table.getColumn(i) - val col: ColumnVector = - ColumnCastUtil.ifTrueThenDeepConvertTypeAtoTypeB(origCol, readField.dataType, - (dt, cv) => needUnsignedToSignedCastFunction(cv, dt), - (dt, cv) => cv.castTo( - DType.create(GpuColumnVector.getNonNestedRapidsType(dt).getTypeId))) - newColumns(i) = col - }) + def existsUnsignedType(group: GroupType): Boolean = { + group.getFields.asScala.exists( + field => { + if (field.isPrimitive) { + hasUnsignedInt(field.getOriginalType) + } else { + existsUnsignedType(field.asGroupType) } - new Table(newColumns: _*) - } finally { - newColumns.safeClose() } - } else { - inputTable - } + ) } - def needUnsignedCast(table: Table, schema: StructType): Boolean = { - (0 until table.getNumberOfColumns).exists(i => - needUnsignedToSignedCast(table.getColumn(i), schema.fields(i).dataType)) + def hasUnsignedInt(t: OriginalType): Boolean = { + (t == OriginalType.UINT_8) || (t == OriginalType.UINT_16) || (t == OriginalType.UINT_32) } def needUnsignedToSignedCast(cv: ColumnView, dataType: DataType): Boolean = { - dataType match { - case a: ArrayType => - val child = cv.getChildColumnView(0) - needUnsignedToSignedCast(child, a.elementType) - case s: StructType => - (0 until cv.getNumChildren).exists(i => - needUnsignedToSignedCast(cv.getChildColumnView(i), s(i).dataType) - ) - case _ => - needUnsignedToSignedCastFunction(cv, dataType) - } - } - - def needUnsignedToSignedCastFunction(cv: ColumnView, dataType: DataType): Boolean = { (cv.getType.equals(DType.UINT8) && dataType.isInstanceOf[ShortType]) || (cv.getType.equals(DType.UINT16) && dataType.isInstanceOf[IntegerType]) || (cv.getType.equals(DType.UINT32) && dataType.isInstanceOf[LongType]) @@ -1424,3 +1406,4 @@ object ParquetPartitionReader { }) } } + From 416a94503c8c27c903aca0d6ed4d7248971bde4e Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Tue, 14 Sep 2021 19:01:09 +0800 Subject: [PATCH 07/19] refactor code Signed-off-by: Chong Gao --- .../com/nvidia/spark/rapids/GpuParquetScan.scala | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index 7dc5a4a7970..69a8980da04 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -671,18 +671,19 @@ trait ParquetPartitionReaderBase extends Logging with Arm with ScanWithMetrics } /** - * Convert cudf unsigned integer to wider signed integer that parquet expects + * Need to convert cudf unsigned integer to wider signed integer that parquet expects * After spark 3.2.0, parquet read uint8 as int16, uint16 as int32, uint32 as int64 * TODO uint64 -> Decimal(20,0) depends CUDF, see issue #3475 * - * @param group the input table - * @return converted table if the input table has unsigned integer + * @param group the schema + * @return if has unsigned integer */ def existsUnsignedType(group: GroupType): Boolean = { group.getFields.asScala.exists( field => { if (field.isPrimitive) { - hasUnsignedInt(field.getOriginalType) + val t = field.getOriginalType + (t == OriginalType.UINT_8) || (t == OriginalType.UINT_16) || (t == OriginalType.UINT_32) } else { existsUnsignedType(field.asGroupType) } @@ -690,10 +691,6 @@ trait ParquetPartitionReaderBase extends Logging with Arm with ScanWithMetrics ) } - def hasUnsignedInt(t: OriginalType): Boolean = { - (t == OriginalType.UINT_8) || (t == OriginalType.UINT_16) || (t == OriginalType.UINT_32) - } - def needUnsignedToSignedCast(cv: ColumnView, dataType: DataType): Boolean = { (cv.getType.equals(DType.UINT8) && dataType.isInstanceOf[ShortType]) || (cv.getType.equals(DType.UINT16) && dataType.isInstanceOf[IntegerType]) || From 75908afa5cf5e1bfd0fe298c3fcdf8fb822b06e9 Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Wed, 15 Sep 2021 09:54:12 +0800 Subject: [PATCH 08/19] fix conflict after merge Signed-off-by: Chong Gao --- .../test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala index 197eff370d0..d5345149b7b 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala @@ -93,7 +93,7 @@ class ParquetScanSuite extends SparkQueryCompareTestSuite { testSparkResultsAreEqual("Test Parquet unsigned int: uint8, uint16, uint32", frameFromParquet("unsigned-int.parquet"), - assumeCondition = (_ => (spark320orLater, "Spark version not 3.2.0+"))) { + assumeCondition = (_ => (isSpark320OrLater, "Spark version not 3.2.0+"))) { frame => frame.select(col("*")) } } From 67ad4c1de27b761b91ab5dff3f84e6bd1e1c6584 Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Wed, 15 Sep 2021 16:44:12 +0800 Subject: [PATCH 09/19] refactor Signed-off-by: Chong Gao --- .../nvidia/spark/rapids/GpuParquetScan.scala | 47 ++++++++++--------- 1 file changed, 24 insertions(+), 23 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index 69a8980da04..b1992614f57 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -627,26 +627,12 @@ trait ParquetPartitionReaderBase extends Logging with Arm with ScanWithMetrics val readField = readDataSchema(writeAt) if (areNamesEquiv(clippedGroups, readAt, readField.name, isSchemaCaseSensitive)) { val origCol = table.getColumn(readAt) - - var col: ColumnVector = origCol - if (!(decimalCastingNeeded || unsignedCastingNeeded)) { - col = origCol.incRefCount() + val col: ColumnVector = if (decimalCastingNeeded || unsignedCastingNeeded) { + ColumnCastUtil.ifTrueThenDeepConvertTypeAtoTypeB(origCol, readField.dataType, + (dt, cv) => needDecimalCast(cv, dt) || needUnsignedToSignedCast(cv, dt), + (dt, cv) => decimalCastOrUnsignedCast(cv, dt)) } else { - if (decimalCastingNeeded) { - col = ColumnCastUtil.ifTrueThenDeepConvertTypeAtoTypeB( - col, readField.dataType, - (dt, cv) => cv.getType.isDecimalType && - !GpuColumnVector.getNonNestedRapidsType(dt).equals(cv.getType()), - (dt, cv) => - cv.castTo(DecimalUtil.createCudfDecimal(dt.asInstanceOf[DecimalType]))) - } - if (unsignedCastingNeeded) { - col = ColumnCastUtil.ifTrueThenDeepConvertTypeAtoTypeB( - col, readField.dataType, - (dt, cv) => needUnsignedToSignedCast(cv, dt), - (dt, cv) => - cv.castTo(DType.create(GpuColumnVector.getNonNestedRapidsType(dt).getTypeId))) - } + origCol.incRefCount() } newColumns(writeAt) = col @@ -691,10 +677,25 @@ trait ParquetPartitionReaderBase extends Logging with Arm with ScanWithMetrics ) } - def needUnsignedToSignedCast(cv: ColumnView, dataType: DataType): Boolean = { - (cv.getType.equals(DType.UINT8) && dataType.isInstanceOf[ShortType]) || - (cv.getType.equals(DType.UINT16) && dataType.isInstanceOf[IntegerType]) || - (cv.getType.equals(DType.UINT32) && dataType.isInstanceOf[LongType]) + def needDecimalCast(cv: ColumnView, dt: DataType): Boolean = { + cv.getType.isDecimalType && !GpuColumnVector.getNonNestedRapidsType(dt).equals(cv.getType()) + } + + def needUnsignedToSignedCast(cv: ColumnView, dt: DataType): Boolean = { + (cv.getType.equals(DType.UINT8) && dt.isInstanceOf[ShortType]) || + (cv.getType.equals(DType.UINT16) && dt.isInstanceOf[IntegerType]) || + (cv.getType.equals(DType.UINT32) && dt.isInstanceOf[LongType]) + } + + def decimalCastOrUnsignedCast(cv: ColumnView, dt: DataType): ColumnView = { + if (needDecimalCast(cv, dt)) { + cv.castTo(DecimalUtil.createCudfDecimal(dt.asInstanceOf[DecimalType])) + } else if (needUnsignedToSignedCast(cv, dt)) { + cv.castTo(DType.create(GpuColumnVector.getNonNestedRapidsType(dt).getTypeId)) + } else { + throw new IllegalStateException("Logical error: should only be " + + "decimal cast or unsigned to signed cast") + } } protected def readPartFile( From e219e4372a42c1594b3b7fbcbf8cc02416af1fa2 Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Wed, 15 Sep 2021 16:53:03 +0800 Subject: [PATCH 10/19] add comment Signed-off-by: Chong Gao --- .../test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala index d5345149b7b..a63d5a79ca4 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala @@ -91,6 +91,7 @@ class ParquetScanSuite extends SparkQueryCompareTestSuite { StructField("c3_long", LongType), StructField("c1_int", IntegerType))))) { frame => frame } + // Column schema of unsigned-int.parquet is: [INT32 a(UINT_8), INT32 b(UINT_16), INT32 c(UINT_32)] testSparkResultsAreEqual("Test Parquet unsigned int: uint8, uint16, uint32", frameFromParquet("unsigned-int.parquet"), assumeCondition = (_ => (isSpark320OrLater, "Spark version not 3.2.0+"))) { From 1ed5f52b104c6e5cb33b99c41d6914862f30c404 Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Fri, 17 Sep 2021 12:22:18 +0800 Subject: [PATCH 11/19] Update test case and refactor Signed-off-by: Chong Gao --- .../nvidia/spark/rapids/GpuParquetScan.scala | 14 +++--- tests/src/test/resources/unsigned-int.parquet | Bin 6040 -> 4996 bytes .../spark/rapids/ParquetScanSuite.scala | 42 +++++++++++++++++- 3 files changed, 49 insertions(+), 7 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index b1992614f57..268cf768d74 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -610,12 +610,13 @@ trait ParquetPartitionReaderBase extends Logging with Arm with ScanWithMetrics inputTable: Table, filePath: String, clippedSchema: MessageType): Table = { + val precisions = getPrecisionsList(clippedSchema.asGroupType().getFields.asScala) // check if there are cols with precision that can be stored in an int - val decimalCastingNeeded = precisions.exists(p => p <= Decimal.MAX_INT_DIGITS) - val unsignedCastingNeeded = existsUnsignedType(clippedSchema.asGroupType()) + val hasDecimalAsInt = precisions.exists(p => p <= Decimal.MAX_INT_DIGITS) + val hasUnsignedType = existsUnsignedType(clippedSchema.asGroupType()) if (readDataSchema.length > inputTable.getNumberOfColumns - || decimalCastingNeeded || unsignedCastingNeeded) { + || hasDecimalAsInt || hasUnsignedType) { // Spark+Parquet schema evolution is relatively simple with only adding/removing columns // To type casting or anything like that val clippedGroups = clippedSchema.asGroupType() @@ -627,14 +628,13 @@ trait ParquetPartitionReaderBase extends Logging with Arm with ScanWithMetrics val readField = readDataSchema(writeAt) if (areNamesEquiv(clippedGroups, readAt, readField.name, isSchemaCaseSensitive)) { val origCol = table.getColumn(readAt) - val col: ColumnVector = if (decimalCastingNeeded || unsignedCastingNeeded) { + val col: ColumnVector = if (hasDecimalAsInt || hasUnsignedType) { ColumnCastUtil.ifTrueThenDeepConvertTypeAtoTypeB(origCol, readField.dataType, (dt, cv) => needDecimalCast(cv, dt) || needUnsignedToSignedCast(cv, dt), (dt, cv) => decimalCastOrUnsignedCast(cv, dt)) } else { origCol.incRefCount() } - newColumns(writeAt) = col readAt += 1 } else { @@ -687,7 +687,9 @@ trait ParquetPartitionReaderBase extends Logging with Arm with ScanWithMetrics (cv.getType.equals(DType.UINT32) && dt.isInstanceOf[LongType]) } - def decimalCastOrUnsignedCast(cv: ColumnView, dt: DataType): ColumnView = { + // Will do cast if needDecimalCast or needUnsignedToSignedCast test is true + // in ColumnCastUtil.ifTrueThenDeepConvertTypeAtoTypeB + private def decimalCastOrUnsignedCast(cv: ColumnView, dt: DataType): ColumnView = { if (needDecimalCast(cv, dt)) { cv.castTo(DecimalUtil.createCudfDecimal(dt.asInstanceOf[DecimalType])) } else if (needUnsignedToSignedCast(cv, dt)) { diff --git a/tests/src/test/resources/unsigned-int.parquet b/tests/src/test/resources/unsigned-int.parquet index e2fd6f92b5fd703fe1e44adac1cd2aeefe141cf4..98ed89713e50a435f85807abad5bc46cb77a45d0 100644 GIT binary patch literal 4996 zcmcgwU1%d!6uvW)rqkH8>9+Sefq*e=hc3j>q;_pf5XF}wQOeRMWp|k*lWy50?I!6; zDFrFTKk7b+EPW_iRF)#~7g=`qLFy8u6p=+laFK%5H>E7XmQoh=oSDf?wn^FvsfIRl z=gd9lJKuNDJ#)#}vuDDTP#^Wv#rr=h*yB?qe1$`haY_6h*B4I5YiJXPtaW828@UvpfdgS`|W#zk9xq*Nu7jr zJ)=$9!EcWs2yd{r|C^9wG)jNFzja$Qvg{+o%@NKm65cIFHIa-=;nG;P9@;|}_dk0) z3Yh~{(T5QX#gX&I*Kcm}J}MsI>_SacoCB zS|x86@0GBq2wb%KfxYI_@7!med=5V3i z94@)d;aZ0|EOqKbSdRyfjkf2fmE6Vi`NRb^qtG5$rCwg*lwNk|RBtFHm0hY*o{0M$Hy1f1Ix#IbRE#syPjD%VD)?6SbxROnDN# zOf(jJB5b8$SRy(PJ~4oy_)HjL@v)%Jr0t?Qd;YSLo>8HBg9DKSupEfdqD`=jTA0Yu zVf37IZykG}zLUei*0VL7n8WMyKIoK1A1vJ<5uILGD9SSj8)Y$BjOsgG-}a33Y5QKk zgCc}p4;c``mKp6~S3NXD z{matMx~qY*j3}EYP`grVeH7RfCQ7MQUTSR&3QMwlcRNXcSg+z^0&qE^zEE4yMW1+Zv;G7eRjSMgjL+ zwL$bfUjeh{lGk00!Q4ZbcX|+J0Zvs$jq{mET`mjl^5!FrfxIRG$hroy1n0k6AXQwJ z-13j@jlrxS%#H@L)={noSeJnsT;Ao_3Ma31$Xo8_is4HHU7=VErOw)*72I^#Qm4G% z(H!gs!rnz#cB_*)WRYF*^uS3XC`nVfiI6g_z}q^cy|_b{)m(mZDl4@Ngd(ASBAu$5 zk-wALZZa8rYUa>T&M-28md^o(1cli7kgq2}!SCM@6dRCdC? zW72dq$tk&4fHOGgR$@S^S%UOWB1pBx?o>CD)hz za2?I3hm=qvYz7PD8>^v2#B#Mnv{`_O0n6D=6`h5ZZ$`6>#nY4}qfA2*@Wwb$cG@Yj zmhHJF>M>)fRW|`k#mW>8Ser`1QoU{x#%fe%P-$u${_H%Fo;SMO6PcXfXp8=EC=6}U z-xr^mOeg)x_)~+*>7npoDiT-1$*`&p4JX6OaKEC)Bg3at;naXKl#2A9f@3!PbYuUI Myx=Be41Rz67o0+p2LJ#7 literal 6040 zcmeI$1(Og*8URodHe};Ob_0YY8(9Jg5G(-_A_RALUB%sfad&rjcXxMpcX#)DaJjm> zx}R`N^_!aM`KCtd>z~-sY%06{7hQXk)B`pl??pG@BG1^WaKY0k(n%HB^%kvK~8d!n>^$tANeUjK?+fr zA{3<<#VJ8aN>Q3Jl%*Wy2_u{cB8eiJ3RI*Lm8n8is!^R9)T9=*sY6}rQJ)4hq!Ep2 zLQ|U2oEF5;l2){)4Q**hdpgjOPIRUVUFk-5deDAZhTiM2TcCeFO>}C&p*~fkkaF9bB<_JeQ#&J$?l2e@K3}-pVc`k5~OI+p( zSGmS@Zg7)Z+~y8D>Kg)s0u#mycySk+Hv`{#`<-{+d;f#DAAa=lCqcoV#*3fevxJ|2@ny(YUw`v$Xrk{D zC;9$|q{)*1m?CAW)MW1zxpn=gNXf`tkf zDO#*}iISyCmnmDWd{}ryWK?v8ij^u?samaijheM;*Qr~teuIXM8aHX$ta*!=maSU1 zY1^)ShmM^(cj?-#dyk&IdiUwuum6C7g9Z;7I&Aoek)uYB89Q$Lgo%?TPnkMx`iz;g zX3v>BZ~lUXixw|gx@`H1m8({-S-WoihK-vxZ`rzS`;MKvcJJA{Z~uXVhYlY(dhGa# zlc!FfIeYH>g^QOiU%7hi`i+~nZr{0k@BV{_j~+jH`t14t>+{F{*9(v)F(Jy!G>P@E zGsX|q%QJospq!zWQLtW}op9VCf5shg=Q<~A77R1n!RY(34*DrVoATM}Gz|kUW_xyE&FaFbiy<_>qc z$9*2~kVib`2~T;(b6zz5*La=S1n>rNc#}Zh;%(mHUEbq;J|He1@(~~N2|)z&De;I; z0zM-lpYsJ@62ez}%{P2YD2e!v#3bQ+ejq8yNY0O>ASJ0tO&WgUXVQ|6^!&oFWZ*Y` z=MVlQBY%;J%w!=e*~m@~a*~VODP6=9-w4ya_XiGcV(}9k3 zqBC9SN;kUGgP!!FH+|?!Kl(F(fed0WLm0|1hBJbZjAArn7|S@uGl7XrVlq>h$~2}k zgPF`?HglNEJm#~2g)Cw*OIXS>ma~GDtYS55Sj#%rvw@9lVl!LV$~LyMgPrVRH+$I2 zKK65fgB;>8M>xtcj&p*OoZ>WRILkTCbAgLo;xbpb$~CTYgPYvqHg~woJ?`^>hdkmj zPk72Rp7Uyq@alK7q0c}g8qc$;^4m-l#| z4~WZ$e8k6mLJ+}xN<89|fX_(C=X}AJgzyz#^9|n;N+P}^F-iEIA4p0vlJg@eNJ%PE zlZK!8nY5%MJ-_fP8TgIg`GY^n$X{e4Gg-(=HnNk0oa7=mdB{sX@>76<6rwOiC`vJk zQ-YF|qBLbFOF7CDMmP~f5=Ar>s7NI$Q-!KjqdGOHNiAwqhq~0GJ`HF{BO23$rZl5D zEr_8dt!Paf+R~2pbf6=h=u8*7(v9x)peMcPO&|KwkNyl`AcGjp5QZ|0;f!D;qZrK? z#xjoaOkg6Dn9LNWGL7lXU?#Je%^c=3kNGTMA&Xed5|*-z<*Z;Ot60q%*0PTEY+xgs z*vuBTvW@NRU?;oS%^vo$kNq6rAcr{25sq?<>6Q1&n=LEeTKX>+k+BIs_4T@tYRD%r*4gPy8JT%_lTM?lN zVg<(p#!nbOL!!WhP6`bSh%YqgMQ!l4cuJzc4F5AxmFQl)m%*_D1&9ALctc3=h7bo{ zM7x3inAS?PEd9TuBmOgbS^EEt4t`d`wPPjDQ8TP|t$I;)lU1u7Qa`G8ok}&ThonrN zE_uq3#NqWSRgDZu6BZtpCNeB4ZFqQ!$TU%r;Stek)2B)u7L_t}>h#g6(nUw6j!qjL O5uM~^$Clrm{67FSfW<=q diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala index a63d5a79ca4..2100f4b57b2 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala @@ -91,7 +91,47 @@ class ParquetScanSuite extends SparkQueryCompareTestSuite { StructField("c3_long", LongType), StructField("c1_int", IntegerType))))) { frame => frame } - // Column schema of unsigned-int.parquet is: [INT32 a(UINT_8), INT32 b(UINT_16), INT32 c(UINT_32)] + /** + * Column schema of unsigned-int.parquet is: + * + * message root { + * required int32 a (UINT_8); // uint_8 + * required int32 b (UINT_16); // uint_16 + * required int32 c (UINT_32); // uint_32 + * + * repeated int32 d (UINT_8); // array of uint_8 + * repeated int32 e (UINT_16); // array of uint_16 + * repeated int32 f (UINT_32); // array of uint_32 + * + * repeated group g { // array of struct(uint_8, uint_16, uint_32) + * required int32 c1 (UINT_8); + * required int32 c2 (UINT_16); + * required int32 c3 (UINT_32); + * } + * + * required group m1 (MAP) { // map of (uint_8, uint_8) + * repeated group key_value { + * required int32 key(UINT_8); + * optional int32 value(UINT_8); + * } + * } + * + * required group m2 (MAP) { // map of (uint_16, uint_16) + * repeated group key_value { + * required int32 key(UINT_16); + * optional int32 value(UINT_16); + * } + * } + * + * required group m3 (MAP) { // map of (uint_32, uint_32) + * repeated group key_value { + * required int32 key(UINT_32); + * optional int32 value(UINT_32); + * } + * } + * } + * + */ testSparkResultsAreEqual("Test Parquet unsigned int: uint8, uint16, uint32", frameFromParquet("unsigned-int.parquet"), assumeCondition = (_ => (isSpark320OrLater, "Spark version not 3.2.0+"))) { From 7db6e63461d2fef63403e9dc11a36a416b0ed977 Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Fri, 17 Sep 2021 13:23:11 +0800 Subject: [PATCH 12/19] add comment Signed-off-by: Chong Gao --- .../main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index 268cf768d74..11cc904d736 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -689,6 +689,9 @@ trait ParquetPartitionReaderBase extends Logging with Arm with ScanWithMetrics // Will do cast if needDecimalCast or needUnsignedToSignedCast test is true // in ColumnCastUtil.ifTrueThenDeepConvertTypeAtoTypeB + // Note: The behavior of unsigned to signed is decided by the spark parquet, this means the parameter dt + // is from spark meta module. Here is only following the behavior accordingly for GPU. + // This is suitable for all spark versions, no need to add to shim layer. private def decimalCastOrUnsignedCast(cv: ColumnView, dt: DataType): ColumnView = { if (needDecimalCast(cv, dt)) { cv.castTo(DecimalUtil.createCudfDecimal(dt.asInstanceOf[DecimalType])) From 46213ea8474aef80dff35612889fac01f79d1912 Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Fri, 17 Sep 2021 13:54:57 +0800 Subject: [PATCH 13/19] add comment Signed-off-by: Chong Gao --- tests/src/test/resources/unsigned-int.parquet | Bin 4996 -> 573 bytes .../spark/rapids/ParquetScanSuite.scala | 33 +----------------- 2 files changed, 1 insertion(+), 32 deletions(-) diff --git a/tests/src/test/resources/unsigned-int.parquet b/tests/src/test/resources/unsigned-int.parquet index 98ed89713e50a435f85807abad5bc46cb77a45d0..12a801aea2e5239cfc32a1f01b1134a419280dc2 100644 GIT binary patch literal 573 zcmds#-%7(U6o*gRI^+MVgjRZyo0JJU>{{psyYWuJi{Tq2Z36?fS~stKh&_{+yYPwp z6$Tq@Pq1*l!;j=6fpDe|FR38$!%(+EA#%avFt{S3FhC>OwQe!s0UvZHN1z9e0S71G z6r6!`Z~-nsch!MB;YH7nLEAKQ(Ieq$3YpPzovaSmsnpjbRTM{)gvY{*H1P9+Sefq*e=hc3j>q;_pf5XF}wQOeRMWp|k*lWy50?I!6; zDFrFTKk7b+EPW_iRF)#~7g=`qLFy8u6p=+laFK%5H>E7XmQoh=oSDf?wn^FvsfIRl z=gd9lJKuNDJ#)#}vuDDTP#^Wv#rr=h*yB?qe1$`haY_6h*B4I5YiJXPtaW828@UvpfdgS`|W#zk9xq*Nu7jr zJ)=$9!EcWs2yd{r|C^9wG)jNFzja$Qvg{+o%@NKm65cIFHIa-=;nG;P9@;|}_dk0) z3Yh~{(T5QX#gX&I*Kcm}J}MsI>_SacoCB zS|x86@0GBq2wb%KfxYI_@7!med=5V3i z94@)d;aZ0|EOqKbSdRyfjkf2fmE6Vi`NRb^qtG5$rCwg*lwNk|RBtFHm0hY*o{0M$Hy1f1Ix#IbRE#syPjD%VD)?6SbxROnDN# zOf(jJB5b8$SRy(PJ~4oy_)HjL@v)%Jr0t?Qd;YSLo>8HBg9DKSupEfdqD`=jTA0Yu zVf37IZykG}zLUei*0VL7n8WMyKIoK1A1vJ<5uILGD9SSj8)Y$BjOsgG-}a33Y5QKk zgCc}p4;c``mKp6~S3NXD z{matMx~qY*j3}EYP`grVeH7RfCQ7MQUTSR&3QMwlcRNXcSg+z^0&qE^zEE4yMW1+Zv;G7eRjSMgjL+ zwL$bfUjeh{lGk00!Q4ZbcX|+J0Zvs$jq{mET`mjl^5!FrfxIRG$hroy1n0k6AXQwJ z-13j@jlrxS%#H@L)={noSeJnsT;Ao_3Ma31$Xo8_is4HHU7=VErOw)*72I^#Qm4G% z(H!gs!rnz#cB_*)WRYF*^uS3XC`nVfiI6g_z}q^cy|_b{)m(mZDl4@Ngd(ASBAu$5 zk-wALZZa8rYUa>T&M-28md^o(1cli7kgq2}!SCM@6dRCdC? zW72dq$tk&4fHOGgR$@S^S%UOWB1pBx?o>CD)hz za2?I3hm=qvYz7PD8>^v2#B#Mnv{`_O0n6D=6`h5ZZ$`6>#nY4}qfA2*@Wwb$cG@Yj zmhHJF>M>)fRW|`k#mW>8Ser`1QoU{x#%fe%P-$u${_H%Fo;SMO6PcXfXp8=EC=6}U z-xr^mOeg)x_)~+*>7npoDiT-1$*`&p4JX6OaKEC)Bg3at;naXKl#2A9f@3!PbYuUI Myx=Be41Rz67o0+p2LJ#7 diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala index 2100f4b57b2..4c76040b4ac 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala @@ -93,42 +93,11 @@ class ParquetScanSuite extends SparkQueryCompareTestSuite { /** * Column schema of unsigned-int.parquet is: - * + * TODO: nest type is blocked by issue: https://github.com/rapidsai/cudf/issues/9240 * message root { * required int32 a (UINT_8); // uint_8 * required int32 b (UINT_16); // uint_16 * required int32 c (UINT_32); // uint_32 - * - * repeated int32 d (UINT_8); // array of uint_8 - * repeated int32 e (UINT_16); // array of uint_16 - * repeated int32 f (UINT_32); // array of uint_32 - * - * repeated group g { // array of struct(uint_8, uint_16, uint_32) - * required int32 c1 (UINT_8); - * required int32 c2 (UINT_16); - * required int32 c3 (UINT_32); - * } - * - * required group m1 (MAP) { // map of (uint_8, uint_8) - * repeated group key_value { - * required int32 key(UINT_8); - * optional int32 value(UINT_8); - * } - * } - * - * required group m2 (MAP) { // map of (uint_16, uint_16) - * repeated group key_value { - * required int32 key(UINT_16); - * optional int32 value(UINT_16); - * } - * } - * - * required group m3 (MAP) { // map of (uint_32, uint_32) - * repeated group key_value { - * required int32 key(UINT_32); - * optional int32 value(UINT_32); - * } - * } * } * */ From 58579729af415e4f1802b130d946c2d490e62f1f Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Fri, 17 Sep 2021 16:38:06 +0800 Subject: [PATCH 14/19] fix read map type error Signed-off-by: Chong Gao --- .../nvidia/spark/rapids/ColumnCastUtil.scala | 24 +++++++++++-- tests/src/test/resources/unsigned-int.parquet | Bin 573 -> 1636 bytes .../spark/rapids/ParquetScanSuite.scala | 32 +++++++++++++++--- 3 files changed, 50 insertions(+), 6 deletions(-) diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/ColumnCastUtil.scala b/sql-plugin/src/main/java/com/nvidia/spark/rapids/ColumnCastUtil.scala index 83dd25a4212..2bc8245851c 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/ColumnCastUtil.scala +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/ColumnCastUtil.scala @@ -18,9 +18,9 @@ package com.nvidia.spark.rapids import scala.collection.mutable.{ArrayBuffer, ArrayBuilder} -import ai.rapids.cudf.{ColumnVector, ColumnView} +import ai.rapids.cudf.{ColumnVector, ColumnView, DType} -import org.apache.spark.sql.types.{ArrayType, DataType, StructType} +import org.apache.spark.sql.types.{ArrayType, DataType, StructType, MapType, StructField} /** * This class casts a column to another column if the predicate passed resolves to true. @@ -90,6 +90,26 @@ object ColumnCastUtil extends Arm { } else { cv } + case m: MapType => + // map is list of structure + val struct = cv.getChildColumnView(0) + toClose += struct + + if(cv.getType != DType.LIST || struct.getType != DType.STRUCT) { + throw new IllegalStateException("Map should be List(Structure) in column view") + } + + val newChild = convertTypeAToTypeB(struct, + StructType(Array(StructField("", m.keyType), StructField("", m.valueType))) + , predicate, toClose) + + if (struct == newChild) { + cv + } else { + val newView = cv.replaceListChild(newChild) + toClose += newView + newView + } case _ => if (predicate(dataType, cv)) { val col = convert(dataType, cv) diff --git a/tests/src/test/resources/unsigned-int.parquet b/tests/src/test/resources/unsigned-int.parquet index 12a801aea2e5239cfc32a1f01b1134a419280dc2..ce4c1f8f804e0ff593c9663bf023a9b3e6c4fef1 100644 GIT binary patch literal 1636 zcmd^=Jx|*}7{~9Y*gwHWn@8qMX02KlpyshL>c)2 zh%rL_4&52}FpMbT@i}o`e2}G`>fv<0`~9E)bNAfYoujV95Zhu~JB&g{V_Hq87$Kxn zXb?}E^ht;Q9jM8yMFMNqe-WhVs=vXiXFjtP)>(ZAwt+I0sl}}zkdIOk?I)~M0FwqW znYl5Dl7^a~ccT2~sSx93`;{cH9SF=8Pd`xc+|0!yFuLaU1QVeX8n*$4@pIYvTv z&XLf|K+;gQ%Li z=AtU%)TkMcwpL@ru#jfJiWv;@F7MjfVc}xhCkrsh!4nDifEQ$#lMh&ePII(@&tNZs z)T=pzoCx+RN#CMM6JkG*yi@6m3zgv`9iZ!hD47=QS;) zh3qL6P6cgNRG}VYKE?p9It%qkT~fPmI*(L|_O>L~7MyQO67^kH`z&woP3Jq=X_%Ck zoR_o(=SjKf%zsxnMUAhQ{@`jDUw`Qh%$s=d^SuAVbavajj@bx?=NFOL^aH;c`Ee@< k_M&DS1!2-Ua6LbEJntZJ+ezdltt3o7qd)M4>%jlz6;yM%tpET3 literal 573 zcmds#-%7(U6o*gRI^+MVgjRZyo0JJU>{{psyYWuJi{Tq2Z36?fS~stKh&_{+yYPwp z6$Tq@Pq1*l!;j=6fpDe|FR38$!%(+EA#%avFt{S3FhC>OwQe!s0UvZHN1z9e0S71G z6r6!`Z~-nsch!MB;YH7nLEAKQ(Ieq$3YpPzovaSmsnpjbRTM{)gvY{*H1P Date: Fri, 17 Sep 2021 17:06:21 +0800 Subject: [PATCH 15/19] fix line length exceeds 100 Signed-off-by: Chong Gao --- .../scala/com/nvidia/spark/rapids/GpuParquetScan.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index 11cc904d736..4bd12be79e5 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -688,9 +688,10 @@ trait ParquetPartitionReaderBase extends Logging with Arm with ScanWithMetrics } // Will do cast if needDecimalCast or needUnsignedToSignedCast test is true - // in ColumnCastUtil.ifTrueThenDeepConvertTypeAtoTypeB - // Note: The behavior of unsigned to signed is decided by the spark parquet, this means the parameter dt - // is from spark meta module. Here is only following the behavior accordingly for GPU. + // in ColumnCastUtil.ifTrueThenDeepConvertTypeAtoTypeB. + // Note: The behavior of unsigned to signed is decided by the spark parquet, + // this means the parameter dt is from spark meta module. + // Here is only following the behavior accordingly for GPU. // This is suitable for all spark versions, no need to add to shim layer. private def decimalCastOrUnsignedCast(cv: ColumnView, dt: DataType): ColumnView = { if (needDecimalCast(cv, dt)) { From 10da87d9ceefe61fb146b8437a139503c0c57ebb Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Fri, 17 Sep 2021 17:57:43 +0800 Subject: [PATCH 16/19] add map(uint,struct(uint,uint,uint)) test Signed-off-by: Chong Gao --- tests/src/test/resources/unsigned-int.parquet | Bin 1636 -> 2256 bytes .../spark/rapids/ParquetScanSuite.scala | 10 ++++++++++ 2 files changed, 10 insertions(+) diff --git a/tests/src/test/resources/unsigned-int.parquet b/tests/src/test/resources/unsigned-int.parquet index ce4c1f8f804e0ff593c9663bf023a9b3e6c4fef1..62fbcc9f798b1ba30bcceb1c63652735ae98709a 100644 GIT binary patch delta 365 zcmaFDb3t&!9Hz-_jN+4Xne=hcMx4?fEJgYGC87e8S1>(iGi8!s$~Bpsz^uq=${@j7 zmY7qTI{6Hf3cDDKq>SVrfz4ByOPT7`wsFabGD)hkut{0~m2+mNR>p&sG6Pwn3}PH& zJ&ZsERAwW=!pOkDAi)A87{ow?+97V3UJn$#5X+d74Ur5u#Er#(bM-v Date: Wed, 22 Sep 2021 11:42:52 +0800 Subject: [PATCH 17/19] update comments Signed-off-by: Chong Gao --- .../java/com/nvidia/spark/rapids/ColumnCastUtil.scala | 4 ++-- .../scala/com/nvidia/spark/rapids/GpuParquetScan.scala | 8 ++++---- .../scala/com/nvidia/spark/rapids/ParquetScanSuite.scala | 2 ++ 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/ColumnCastUtil.scala b/sql-plugin/src/main/java/com/nvidia/spark/rapids/ColumnCastUtil.scala index 2bc8245851c..1ad47dfb756 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/ColumnCastUtil.scala +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/ColumnCastUtil.scala @@ -100,8 +100,8 @@ object ColumnCastUtil extends Arm { } val newChild = convertTypeAToTypeB(struct, - StructType(Array(StructField("", m.keyType), StructField("", m.valueType))) - , predicate, toClose) + StructType(Array(StructField("", m.keyType), StructField("", m.valueType))), + predicate, toClose) if (struct == newChild) { cv diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index 4bd12be79e5..d169943c664 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -689,10 +689,10 @@ trait ParquetPartitionReaderBase extends Logging with Arm with ScanWithMetrics // Will do cast if needDecimalCast or needUnsignedToSignedCast test is true // in ColumnCastUtil.ifTrueThenDeepConvertTypeAtoTypeB. - // Note: The behavior of unsigned to signed is decided by the spark parquet, - // this means the parameter dt is from spark meta module. - // Here is only following the behavior accordingly for GPU. - // This is suitable for all spark versions, no need to add to shim layer. + // Note: The behavior of unsigned to signed is decided by the Spark, + // this means the parameter dt is from Spark meta module. + // This implements the requested type behavior accordingly for GPU. + // This is suitable for all Spark versions, no need to add to shim layer. private def decimalCastOrUnsignedCast(cv: ColumnView, dt: DataType): ColumnView = { if (needDecimalCast(cv, dt)) { cv.castTo(DecimalUtil.createCudfDecimal(dt.asInstanceOf[DecimalType])) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala index 25282bff3fa..e1c6d6f7394 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala @@ -137,6 +137,8 @@ class ParquetScanSuite extends SparkQueryCompareTestSuite { */ testSparkResultsAreEqual("Test Parquet unsigned int: uint8, uint16, uint32", frameFromParquet("unsigned-int.parquet"), + // CPU version throws an exception when Spark < 3.2, so skip when Spark < 3.2. + // The exception is like "Parquet type not supported: INT32 (UINT_8)" assumeCondition = (_ => (isSpark320OrLater, "Spark version not 3.2.0+"))) { frame => frame.select(col("*")) } From 37322e1ced464f98d44a93961c0955b772363685 Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Wed, 22 Sep 2021 11:52:57 +0800 Subject: [PATCH 18/19] update comments Signed-off-by: Chong Gao --- .../main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index d169943c664..8cc0f334a29 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -657,8 +657,8 @@ trait ParquetPartitionReaderBase extends Logging with Arm with ScanWithMetrics } /** - * Need to convert cudf unsigned integer to wider signed integer that parquet expects - * After spark 3.2.0, parquet read uint8 as int16, uint16 as int32, uint32 as int64 + * Need to convert cudf unsigned integer to wider signed integer that Spark expects + * After Spark 3.2.0, Spark reads uint8 as int16, uint16 as int32, uint32 as int64 * TODO uint64 -> Decimal(20,0) depends CUDF, see issue #3475 * * @param group the schema From f49be8a6a756ef43737359e44915be85b155e413 Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Wed, 22 Sep 2021 13:26:22 +0800 Subject: [PATCH 19/19] fix after merge Signed-off-by: Chong Gao --- .../test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala index e1c6d6f7394..b4b89059a23 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala @@ -139,7 +139,7 @@ class ParquetScanSuite extends SparkQueryCompareTestSuite { frameFromParquet("unsigned-int.parquet"), // CPU version throws an exception when Spark < 3.2, so skip when Spark < 3.2. // The exception is like "Parquet type not supported: INT32 (UINT_8)" - assumeCondition = (_ => (isSpark320OrLater, "Spark version not 3.2.0+"))) { + assumeCondition = (_ => (VersionUtils.isSpark320OrLater, "Spark version not 3.2.0+"))) { frame => frame.select(col("*")) } }