From 0de1945edf7bb9cc5f63a6a07f67634db664d2ad Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Tue, 1 Mar 2022 16:29:00 +0800 Subject: [PATCH 1/5] Temporarily disable write/read parquet when schema has a specified Parquet field ID Signed-off-by: Chong Gao --- .../rapids/shims/v2/ParquetFieldIdShims.scala | 11 +++ .../rapids/shims/v2/ParquetFieldIdShims.scala | 18 +++++ .../spark/rapids/GpuParquetFileFormat.scala | 2 + .../spark/rapids/GpuParquetScanBase.scala | 3 + tests/pom.xml | 30 +++++++ .../spark/rapids/ParquetFieldIdSuite.scala | 78 +++++++++++++++++++ 6 files changed, 142 insertions(+) create mode 100644 tests/src/test/330+/scala/com/nvidia/spark/rapids/ParquetFieldIdSuite.scala diff --git a/sql-plugin/src/main/301until330-all/scala/com/nvidia/spark/rapids/shims/v2/ParquetFieldIdShims.scala b/sql-plugin/src/main/301until330-all/scala/com/nvidia/spark/rapids/shims/v2/ParquetFieldIdShims.scala index 9a87dce3785..82113c3cc6c 100644 --- a/sql-plugin/src/main/301until330-all/scala/com/nvidia/spark/rapids/shims/v2/ParquetFieldIdShims.scala +++ b/sql-plugin/src/main/301until330-all/scala/com/nvidia/spark/rapids/shims/v2/ParquetFieldIdShims.scala @@ -16,13 +16,24 @@ package com.nvidia.spark.rapids.shims.v2 +import com.nvidia.spark.rapids.RapidsMeta import org.apache.hadoop.conf.Configuration +import org.apache.spark.sql.RuntimeConfig import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType object ParquetFieldIdShims { /** Updates the Hadoop configuration with the Parquet field ID write setting from SQLConf */ def setupParquetFieldIdWriteConfig(conf: Configuration, sqlConf: SQLConf): Unit = { // Parquet field ID support configs are not supported until Spark 3.3 } + + def tagGpuSupportForFieldId(meta: RapidsMeta[_, _, _], schema: StructType): Unit = { + // Parquet field ID support configs are not supported until Spark 3.3 + } + + def tagGpuSupportReadForFieldId(meta: RapidsMeta[_, _, _], conf: RuntimeConfig): Unit = { + // Parquet field ID support configs are not supported until Spark 3.3 + } } diff --git a/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/v2/ParquetFieldIdShims.scala b/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/v2/ParquetFieldIdShims.scala index db7b25dc255..998dd164bb9 100644 --- a/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/v2/ParquetFieldIdShims.scala +++ b/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/v2/ParquetFieldIdShims.scala @@ -16,9 +16,13 @@ package com.nvidia.spark.rapids.shims.v2 +import com.nvidia.spark.rapids.RapidsMeta import org.apache.hadoop.conf.Configuration +import org.apache.spark.sql.RuntimeConfig +import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType object ParquetFieldIdShims { /** Updates the Hadoop configuration with the Parquet field ID write setting from SQLConf */ @@ -27,4 +31,18 @@ object ParquetFieldIdShims { SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.key, sqlConf.parquetFieldIdWriteEnabled.toString) } + + def tagGpuSupportWriteForFieldId(meta: RapidsMeta[_, _, _], schema: StructType): Unit = { + if (ParquetUtils.hasFieldIds(schema)) { + meta.willNotWorkOnGpu( + "Currently not support 'parquet.field.id' in parquet writer, schema is " + schema.json) + } + } + + def tagGpuSupportReadForFieldId(meta: RapidsMeta[_, _, _], conf: RuntimeConfig): Unit = { + if(conf.get(SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key, "false").toBoolean) { + meta.willNotWorkOnGpu("Currently not support reading field ids, " + + "please set spark.sql.parquet.fieldId.read.enabled as false") + } + } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala index 9031da88f25..4a3bea60f1c 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala @@ -43,6 +43,8 @@ object GpuParquetFileFormat { options: Map[String, String], schema: StructType): Option[GpuParquetFileFormat] = { + ParquetFieldIdShims.tagGpuSupportWriteForFieldId(meta, schema) + val sqlConf = spark.sessionState.conf val parquetOptions = new ParquetOptions(options, sqlConf) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala index 2c8842267de..b743ecc0f0e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala @@ -33,6 +33,7 @@ import com.nvidia.spark.RebaseHelper import com.nvidia.spark.rapids.GpuMetric._ import com.nvidia.spark.rapids.ParquetPartitionReader.CopyRange import com.nvidia.spark.rapids.RapidsPluginImplicits._ +import com.nvidia.spark.rapids.shims.v2.ParquetFieldIdShims import org.apache.commons.io.output.{CountingOutputStream, NullOutputStream} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FSDataInputStream, Path} @@ -143,6 +144,8 @@ object GpuParquetScanBase { meta: RapidsMeta[_, _, _]): Unit = { val sqlConf = sparkSession.conf + ParquetFieldIdShims.tagGpuSupportReadForFieldId(meta, sqlConf) + if (!meta.conf.isParquetEnabled) { meta.willNotWorkOnGpu("Parquet input and output has been disabled. To enable set" + s"${RapidsConf.ENABLE_PARQUET} to true") diff --git a/tests/pom.xml b/tests/pom.xml index 7247c7b37d4..19912f88344 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -238,6 +238,36 @@ + + release330 + + + buildver + 330 + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-profile-test-src + add-test-source + + + + ${project.basedir}/src/test/330+/scala + + + + + + + + diff --git a/tests/src/test/330+/scala/com/nvidia/spark/rapids/ParquetFieldIdSuite.scala b/tests/src/test/330+/scala/com/nvidia/spark/rapids/ParquetFieldIdSuite.scala new file mode 100644 index 00000000000..4c3a7525802 --- /dev/null +++ b/tests/src/test/330+/scala/com/nvidia/spark/rapids/ParquetFieldIdSuite.scala @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids + +import java.io.File + +import org.apache.spark.SparkConf +import org.apache.spark.sql.Row +import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils +import org.apache.spark.sql.types.{IntegerType, MetadataBuilder, StructType} + +/** + * TODO should update after cuDF supports field id + * See https://github.com/NVIDIA/spark-rapids/issues/4846 + */ +class ParquetFieldIdSuite extends SparkQueryCompareTestSuite { + + // this should failed + test("try to write field id") { + val tmpFile = File.createTempFile("field-id", ".parquet") + try { + def withId(id: Int) = + new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id).build() + // not support writing field id + val schema = new StructType().add("c1", IntegerType, nullable = true, withId(1)) + val data = (1 to 4).map(i => Row(i)) + + assertThrows[IllegalArgumentException] { + withGpuSparkSession( + spark => spark.createDataFrame(spark.sparkContext.parallelize(data), schema) + .write.mode("overwrite").parquet(tmpFile.getAbsolutePath) + ) + } + } finally { + tmpFile.delete() + } + } + + // this should failed + test("try to read field id") { + val tmpFile = File.createTempFile("field-id", ".parquet") + try { + def withId(id: Int) = + new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id).build() + val schema = new StructType().add("c1", IntegerType, nullable = true, withId(1)) + val data = (1 to 4).map(i => Row(i)) + + withCpuSparkSession( + spark => spark.createDataFrame(spark.sparkContext.parallelize(data), schema) + .write.mode("overwrite").parquet(tmpFile.getAbsolutePath) + ) + + assertThrows[IllegalArgumentException] { + withGpuSparkSession( + spark => spark.read.parquet(tmpFile.getAbsolutePath).collect(), + // not support read field id + new SparkConf().set("spark.sql.parquet.fieldId.read.enabled", "true") + ) + } + } finally { + tmpFile.delete() + } + } +} From 3e095d532344be4bb908ae7bd1736274aea74ce8 Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Tue, 1 Mar 2022 16:51:33 +0800 Subject: [PATCH 2/5] A fix --- .../com/nvidia/spark/rapids/shims/v2/ParquetFieldIdShims.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql-plugin/src/main/301until330-all/scala/com/nvidia/spark/rapids/shims/v2/ParquetFieldIdShims.scala b/sql-plugin/src/main/301until330-all/scala/com/nvidia/spark/rapids/shims/v2/ParquetFieldIdShims.scala index 82113c3cc6c..a9480a84d65 100644 --- a/sql-plugin/src/main/301until330-all/scala/com/nvidia/spark/rapids/shims/v2/ParquetFieldIdShims.scala +++ b/sql-plugin/src/main/301until330-all/scala/com/nvidia/spark/rapids/shims/v2/ParquetFieldIdShims.scala @@ -29,7 +29,7 @@ object ParquetFieldIdShims { // Parquet field ID support configs are not supported until Spark 3.3 } - def tagGpuSupportForFieldId(meta: RapidsMeta[_, _, _], schema: StructType): Unit = { + def tagGpuSupportWriteForFieldId(meta: RapidsMeta[_, _, _], schema: StructType): Unit = { // Parquet field ID support configs are not supported until Spark 3.3 } From c1f63138c530ebe722fd83264961053a070a9700 Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Wed, 2 Mar 2022 13:18:40 +0800 Subject: [PATCH 3/5] Update tests, fixes --- .../src/main/python/parquet_test.py | 22 +++++- .../src/main/python/parquet_write_test.py | 18 ++++- .../rapids/shims/v2/ParquetFieldIdShims.scala | 6 +- .../rapids/shims/v2/ParquetFieldIdShims.scala | 16 ++-- .../spark/rapids/GpuParquetFileFormat.scala | 4 +- .../spark/rapids/GpuParquetScanBase.scala | 2 +- tests/pom.xml | 30 ------- .../spark/rapids/ParquetFieldIdSuite.scala | 78 ------------------- 8 files changed, 52 insertions(+), 124 deletions(-) delete mode 100644 tests/src/test/330+/scala/com/nvidia/spark/rapids/ParquetFieldIdSuite.scala diff --git a/integration_tests/src/main/python/parquet_test.py b/integration_tests/src/main/python/parquet_test.py index 01ec9bd44f4..eb52ac34568 100644 --- a/integration_tests/src/main/python/parquet_test.py +++ b/integration_tests/src/main/python/parquet_test.py @@ -768,4 +768,24 @@ def test_parquet_read_with_corrupt_files(spark_tmp_path, reader_confs, v1_enable assert_gpu_and_cpu_are_equal_collect( lambda spark : spark.read.parquet(first_data_path, second_data_path, third_data_path), - conf=all_confs) \ No newline at end of file + conf=all_confs) + +# should fallback when trying to read with field ID +@pytest.mark.skipif(is_before_spark_330(), reason='Field ID is not supported before Spark 330') +@allow_non_gpu("FileSourceScanExec", "ColumnarToRowExec") +def test_parquet_read_field_id(spark_tmp_path): + data_path = spark_tmp_path + '/PARQUET_DATA' + schema = StructType([ + StructField("c1", IntegerType(), metadata={'parquet.field.id' : 1}), + ]) + data = [(1,),(2,),(3,),] + # write parquet with field IDs + with_cpu_session(lambda spark :spark.createDataFrame(data, schema).coalesce(1).write.mode("overwrite").parquet(data_path)) + + readSchema = StructType([ + StructField("mapped_name_xxx", IntegerType(), metadata={'parquet.field.id' : 1}), + ]) + assert_gpu_fallback_collect( + lambda spark: spark.read.schema(readSchema).parquet(data_path), + 'FileSourceScanExec', + {"spark.sql.parquet.fieldId.read.enabled": "true"}) # default is false diff --git a/integration_tests/src/main/python/parquet_write_test.py b/integration_tests/src/main/python/parquet_write_test.py index 993c0575d13..fd556830135 100644 --- a/integration_tests/src/main/python/parquet_write_test.py +++ b/integration_tests/src/main/python/parquet_write_test.py @@ -19,7 +19,7 @@ from data_gen import * from marks import * from pyspark.sql.types import * -from spark_session import with_cpu_session, with_gpu_session +from spark_session import with_cpu_session, with_gpu_session, is_before_spark_330 import pyspark.sql.functions as f import pyspark.sql.utils import random @@ -402,3 +402,19 @@ def create_empty_df(spark, path): lambda spark, path: spark.read.parquet(path), data_path, conf=writer_confs) + +# should fallback when trying to write field ID metadata +@pytest.mark.skipif(is_before_spark_330(), reason='Field ID is not supported before Spark 330') +@allow_non_gpu('DataWritingCommandExec') +def test_parquet_write_field_id(spark_tmp_path): + data_path = spark_tmp_path + '/PARQUET_DATA' + schema = StructType([ + StructField("c1", IntegerType(), metadata={'parquet.field.id' : 1}), + ]) + data = [(1,),(2,),(3,),] + assert_gpu_fallback_write( + lambda spark, path: spark.createDataFrame(data, schema).coalesce(1).write.mode("overwrite").parquet(path), + lambda spark, path: spark.read.parquet(path), + data_path, + 'DataWritingCommandExec', + conf = {"spark.sql.parquet.fieldId.write.enabled" : "true"}) # default is true diff --git a/sql-plugin/src/main/301until330-all/scala/com/nvidia/spark/rapids/shims/v2/ParquetFieldIdShims.scala b/sql-plugin/src/main/301until330-all/scala/com/nvidia/spark/rapids/shims/v2/ParquetFieldIdShims.scala index a9480a84d65..3830b291961 100644 --- a/sql-plugin/src/main/301until330-all/scala/com/nvidia/spark/rapids/shims/v2/ParquetFieldIdShims.scala +++ b/sql-plugin/src/main/301until330-all/scala/com/nvidia/spark/rapids/shims/v2/ParquetFieldIdShims.scala @@ -19,7 +19,6 @@ package com.nvidia.spark.rapids.shims.v2 import com.nvidia.spark.rapids.RapidsMeta import org.apache.hadoop.conf.Configuration -import org.apache.spark.sql.RuntimeConfig import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType @@ -29,11 +28,12 @@ object ParquetFieldIdShims { // Parquet field ID support configs are not supported until Spark 3.3 } - def tagGpuSupportWriteForFieldId(meta: RapidsMeta[_, _, _], schema: StructType): Unit = { + def tagGpuSupportWriteForFieldId(meta: RapidsMeta[_, _, _], schema: StructType, + conf: SQLConf): Unit = { // Parquet field ID support configs are not supported until Spark 3.3 } - def tagGpuSupportReadForFieldId(meta: RapidsMeta[_, _, _], conf: RuntimeConfig): Unit = { + def tagGpuSupportReadForFieldId(meta: RapidsMeta[_, _, _], conf: SQLConf): Unit = { // Parquet field ID support configs are not supported until Spark 3.3 } } diff --git a/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/v2/ParquetFieldIdShims.scala b/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/v2/ParquetFieldIdShims.scala index 998dd164bb9..2305765959d 100644 --- a/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/v2/ParquetFieldIdShims.scala +++ b/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/v2/ParquetFieldIdShims.scala @@ -19,7 +19,6 @@ package com.nvidia.spark.rapids.shims.v2 import com.nvidia.spark.rapids.RapidsMeta import org.apache.hadoop.conf.Configuration -import org.apache.spark.sql.RuntimeConfig import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType @@ -32,17 +31,18 @@ object ParquetFieldIdShims { sqlConf.parquetFieldIdWriteEnabled.toString) } - def tagGpuSupportWriteForFieldId(meta: RapidsMeta[_, _, _], schema: StructType): Unit = { - if (ParquetUtils.hasFieldIds(schema)) { + def tagGpuSupportWriteForFieldId(meta: RapidsMeta[_, _, _], schema: StructType, + conf: SQLConf): Unit = { + if (conf.parquetFieldIdWriteEnabled && ParquetUtils.hasFieldIds(schema)) { meta.willNotWorkOnGpu( - "Currently not support 'parquet.field.id' in parquet writer, schema is " + schema.json) + "field IDs are not supported for Parquet writes, schema is " + schema.json) } } - def tagGpuSupportReadForFieldId(meta: RapidsMeta[_, _, _], conf: RuntimeConfig): Unit = { - if(conf.get(SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key, "false").toBoolean) { - meta.willNotWorkOnGpu("Currently not support reading field ids, " + - "please set spark.sql.parquet.fieldId.read.enabled as false") + def tagGpuSupportReadForFieldId(meta: RapidsMeta[_, _, _], conf: SQLConf): Unit = { + if(conf.parquetFieldIdReadEnabled) { + meta.willNotWorkOnGpu("reading by Parquet field ID is not supported, " + + "SQLConf.PARQUET_FIELD_ID_READ_ENABLED is true") } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala index 4a3bea60f1c..a142d25c1e5 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala @@ -42,10 +42,10 @@ object GpuParquetFileFormat { spark: SparkSession, options: Map[String, String], schema: StructType): Option[GpuParquetFileFormat] = { + val sqlConf = spark.sessionState.conf - ParquetFieldIdShims.tagGpuSupportWriteForFieldId(meta, schema) + ParquetFieldIdShims.tagGpuSupportWriteForFieldId(meta, schema, sqlConf) - val sqlConf = spark.sessionState.conf val parquetOptions = new ParquetOptions(options, sqlConf) if (!meta.conf.isParquetEnabled) { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala index b743ecc0f0e..e629d4b0c95 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala @@ -144,7 +144,7 @@ object GpuParquetScanBase { meta: RapidsMeta[_, _, _]): Unit = { val sqlConf = sparkSession.conf - ParquetFieldIdShims.tagGpuSupportReadForFieldId(meta, sqlConf) + ParquetFieldIdShims.tagGpuSupportReadForFieldId(meta, sparkSession.sessionState.conf) if (!meta.conf.isParquetEnabled) { meta.willNotWorkOnGpu("Parquet input and output has been disabled. To enable set" + diff --git a/tests/pom.xml b/tests/pom.xml index 19912f88344..7247c7b37d4 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -238,36 +238,6 @@ - - release330 - - - buildver - 330 - - - - - - - org.codehaus.mojo - build-helper-maven-plugin - - - add-profile-test-src - add-test-source - - - - ${project.basedir}/src/test/330+/scala - - - - - - - - diff --git a/tests/src/test/330+/scala/com/nvidia/spark/rapids/ParquetFieldIdSuite.scala b/tests/src/test/330+/scala/com/nvidia/spark/rapids/ParquetFieldIdSuite.scala deleted file mode 100644 index 4c3a7525802..00000000000 --- a/tests/src/test/330+/scala/com/nvidia/spark/rapids/ParquetFieldIdSuite.scala +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright (c) 2022, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids - -import java.io.File - -import org.apache.spark.SparkConf -import org.apache.spark.sql.Row -import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils -import org.apache.spark.sql.types.{IntegerType, MetadataBuilder, StructType} - -/** - * TODO should update after cuDF supports field id - * See https://github.com/NVIDIA/spark-rapids/issues/4846 - */ -class ParquetFieldIdSuite extends SparkQueryCompareTestSuite { - - // this should failed - test("try to write field id") { - val tmpFile = File.createTempFile("field-id", ".parquet") - try { - def withId(id: Int) = - new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id).build() - // not support writing field id - val schema = new StructType().add("c1", IntegerType, nullable = true, withId(1)) - val data = (1 to 4).map(i => Row(i)) - - assertThrows[IllegalArgumentException] { - withGpuSparkSession( - spark => spark.createDataFrame(spark.sparkContext.parallelize(data), schema) - .write.mode("overwrite").parquet(tmpFile.getAbsolutePath) - ) - } - } finally { - tmpFile.delete() - } - } - - // this should failed - test("try to read field id") { - val tmpFile = File.createTempFile("field-id", ".parquet") - try { - def withId(id: Int) = - new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id).build() - val schema = new StructType().add("c1", IntegerType, nullable = true, withId(1)) - val data = (1 to 4).map(i => Row(i)) - - withCpuSparkSession( - spark => spark.createDataFrame(spark.sparkContext.parallelize(data), schema) - .write.mode("overwrite").parquet(tmpFile.getAbsolutePath) - ) - - assertThrows[IllegalArgumentException] { - withGpuSparkSession( - spark => spark.read.parquet(tmpFile.getAbsolutePath).collect(), - // not support read field id - new SparkConf().set("spark.sql.parquet.fieldId.read.enabled", "true") - ) - } - } finally { - tmpFile.delete() - } - } -} From 9ba637e41b5e5888a9a75471af0e565f755975b9 Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Wed, 2 Mar 2022 17:26:03 +0800 Subject: [PATCH 4/5] Refactor --- .../nvidia/spark/rapids/shims/v2/ParquetFieldIdShims.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/v2/ParquetFieldIdShims.scala b/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/v2/ParquetFieldIdShims.scala index 2305765959d..8b1e1f2297e 100644 --- a/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/v2/ParquetFieldIdShims.scala +++ b/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/v2/ParquetFieldIdShims.scala @@ -41,8 +41,8 @@ object ParquetFieldIdShims { def tagGpuSupportReadForFieldId(meta: RapidsMeta[_, _, _], conf: SQLConf): Unit = { if(conf.parquetFieldIdReadEnabled) { - meta.willNotWorkOnGpu("reading by Parquet field ID is not supported, " + - "SQLConf.PARQUET_FIELD_ID_READ_ENABLED is true") + meta.willNotWorkOnGpu(s"reading by Parquet field ID is not supported, " + + s"${SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key} is true") } } } From 8fbb990ad5228d053e549ae90f47d2b574674a42 Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Wed, 2 Mar 2022 17:33:06 +0800 Subject: [PATCH 5/5] Revert one empty line --- .../scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala index a142d25c1e5..3885917edfe 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala @@ -42,6 +42,7 @@ object GpuParquetFileFormat { spark: SparkSession, options: Map[String, String], schema: StructType): Option[GpuParquetFileFormat] = { + val sqlConf = spark.sessionState.conf ParquetFieldIdShims.tagGpuSupportWriteForFieldId(meta, schema, sqlConf)