Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable write/read Parquet when Parquet field IDs are used #4882

Merged
merged 5 commits into from
Mar 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion integration_tests/src/main/python/parquet_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -768,4 +768,24 @@ def test_parquet_read_with_corrupt_files(spark_tmp_path, reader_confs, v1_enable

assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.read.parquet(first_data_path, second_data_path, third_data_path),
conf=all_confs)
conf=all_confs)

# should fallback when trying to read with field ID
@pytest.mark.skipif(is_before_spark_330(), reason='Field ID is not supported before Spark 330')
@allow_non_gpu("FileSourceScanExec", "ColumnarToRowExec")
def test_parquet_read_field_id(spark_tmp_path):
data_path = spark_tmp_path + '/PARQUET_DATA'
schema = StructType([
StructField("c1", IntegerType(), metadata={'parquet.field.id' : 1}),
])
data = [(1,),(2,),(3,),]
# write parquet with field IDs
with_cpu_session(lambda spark :spark.createDataFrame(data, schema).coalesce(1).write.mode("overwrite").parquet(data_path))

readSchema = StructType([
StructField("mapped_name_xxx", IntegerType(), metadata={'parquet.field.id' : 1}),
])
assert_gpu_fallback_collect(
lambda spark: spark.read.schema(readSchema).parquet(data_path),
'FileSourceScanExec',
{"spark.sql.parquet.fieldId.read.enabled": "true"}) # default is false
18 changes: 17 additions & 1 deletion integration_tests/src/main/python/parquet_write_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from data_gen import *
from marks import *
from pyspark.sql.types import *
from spark_session import with_cpu_session, with_gpu_session
from spark_session import with_cpu_session, with_gpu_session, is_before_spark_330
import pyspark.sql.functions as f
import pyspark.sql.utils
import random
Expand Down Expand Up @@ -402,3 +402,19 @@ def create_empty_df(spark, path):
lambda spark, path: spark.read.parquet(path),
data_path,
conf=writer_confs)

# should fallback when trying to write field ID metadata
@pytest.mark.skipif(is_before_spark_330(), reason='Field ID is not supported before Spark 330')
@allow_non_gpu('DataWritingCommandExec')
def test_parquet_write_field_id(spark_tmp_path):
data_path = spark_tmp_path + '/PARQUET_DATA'
schema = StructType([
StructField("c1", IntegerType(), metadata={'parquet.field.id' : 1}),
])
data = [(1,),(2,),(3,),]
assert_gpu_fallback_write(
lambda spark, path: spark.createDataFrame(data, schema).coalesce(1).write.mode("overwrite").parquet(path),
lambda spark, path: spark.read.parquet(path),
data_path,
'DataWritingCommandExec',
conf = {"spark.sql.parquet.fieldId.write.enabled" : "true"}) # default is true
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,24 @@

package com.nvidia.spark.rapids.shims.v2

import com.nvidia.spark.rapids.RapidsMeta
import org.apache.hadoop.conf.Configuration

import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType

object ParquetFieldIdShims {
/** Updates the Hadoop configuration with the Parquet field ID write setting from SQLConf */
def setupParquetFieldIdWriteConfig(conf: Configuration, sqlConf: SQLConf): Unit = {
// Parquet field ID support configs are not supported until Spark 3.3
}

def tagGpuSupportWriteForFieldId(meta: RapidsMeta[_, _, _], schema: StructType,
conf: SQLConf): Unit = {
// Parquet field ID support configs are not supported until Spark 3.3
}

def tagGpuSupportReadForFieldId(meta: RapidsMeta[_, _, _], conf: SQLConf): Unit = {
// Parquet field ID support configs are not supported until Spark 3.3
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@

package com.nvidia.spark.rapids.shims.v2

import com.nvidia.spark.rapids.RapidsMeta
import org.apache.hadoop.conf.Configuration

import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType

object ParquetFieldIdShims {
/** Updates the Hadoop configuration with the Parquet field ID write setting from SQLConf */
Expand All @@ -27,4 +30,19 @@ object ParquetFieldIdShims {
SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.key,
sqlConf.parquetFieldIdWriteEnabled.toString)
}

def tagGpuSupportWriteForFieldId(meta: RapidsMeta[_, _, _], schema: StructType,
conf: SQLConf): Unit = {
if (conf.parquetFieldIdWriteEnabled && ParquetUtils.hasFieldIds(schema)) {
meta.willNotWorkOnGpu(
"field IDs are not supported for Parquet writes, schema is " + schema.json)
}
}

def tagGpuSupportReadForFieldId(meta: RapidsMeta[_, _, _], conf: SQLConf): Unit = {
if(conf.parquetFieldIdReadEnabled) {
meta.willNotWorkOnGpu(s"reading by Parquet field ID is not supported, " +
s"${SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key} is true")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ object GpuParquetFileFormat {
schema: StructType): Option[GpuParquetFileFormat] = {

val sqlConf = spark.sessionState.conf

ParquetFieldIdShims.tagGpuSupportWriteForFieldId(meta, schema, sqlConf)

val parquetOptions = new ParquetOptions(options, sqlConf)

if (!meta.conf.isParquetEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import com.nvidia.spark.RebaseHelper
import com.nvidia.spark.rapids.GpuMetric._
import com.nvidia.spark.rapids.ParquetPartitionReader.CopyRange
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.shims.v2.ParquetFieldIdShims
import org.apache.commons.io.output.{CountingOutputStream, NullOutputStream}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, Path}
Expand Down Expand Up @@ -143,6 +144,8 @@ object GpuParquetScanBase {
meta: RapidsMeta[_, _, _]): Unit = {
val sqlConf = sparkSession.conf

ParquetFieldIdShims.tagGpuSupportReadForFieldId(meta, sparkSession.sessionState.conf)

if (!meta.conf.isParquetEnabled) {
meta.willNotWorkOnGpu("Parquet input and output has been disabled. To enable set" +
s"${RapidsConf.ENABLE_PARQUET} to true")
Expand Down