Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support DayTimeIntervalType in ParquetCachedBatchSerializer[databricks] #4926

Merged
merged 5 commits into from
Mar 15, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions integration_tests/src/main/python/asserts.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ def assert_equal(cpu, gpu):
print("GPU OUTPUT: %s" % gpu)
raise

def assert_collection_equal_ignore_order(cpu, gpu):
sorted_cpu = cpu.sort(key=_RowCmp)
sorted_gpu = gpu.sort(key=_RowCmp)
assert_equal(sorted_cpu, sorted_gpu)

def _has_incompat_conf(conf):
return ('spark.rapids.sql.incompatibleOps.enabled' in conf and
conf['spark.rapids.sql.incompatibleOps.enabled'].lower() == 'true')
Expand Down
63 changes: 61 additions & 2 deletions integration_tests/src/main/python/cache_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@

import pytest

from asserts import assert_gpu_and_cpu_are_equal_collect, assert_equal
from asserts import assert_gpu_and_cpu_are_equal_collect, assert_equal, assert_collection_equal_ignore_order
from data_gen import *
import pyspark.sql.functions as f
from spark_session import with_cpu_session, with_gpu_session
from spark_session import with_cpu_session, with_gpu_session, is_before_spark_330
from join_test import create_df
from marks import incompat, allow_non_gpu, ignore_order

Expand Down Expand Up @@ -285,3 +285,62 @@ def helper(spark):
return df.selectExpr("a")

assert_gpu_and_cpu_are_equal_collect(helper)

razajafri marked this conversation as resolved.
Show resolved Hide resolved

@pytest.mark.skipif(is_before_spark_330(), reason='DayTimeInterval is not supported before Spark3.3.0')
@ignore_order(local=True)
def test_cache_daytimeinterval_input_row():
assert_gpu_and_cpu_are_equal_collect(
lambda spark: three_col_df(spark,
DayTimeIntervalGen(), int_gen, null_gen).cache().selectExpr('b', 'a'))


@pytest.mark.skipif(is_before_spark_330(), reason='DayTimeInterval is not supported before Spark3.3.0')
@allow_non_gpu("FileSourceScanExec", "ColumnarToRowExec")
@pytest.mark.parametrize('alongside_gen', [int_gen, ArrayGen(int_gen)], ids=idfn)
@pytest.mark.parametrize('with_rapids_memoryscan', ['true', 'false'],
ids=["rapids_memoryscan_on", "rapids_memoryscan_off"])
@pytest.mark.parametrize('with_rapids_reader', ['true', 'false'],
ids=["rapids_reader_on", "rapids_reader_off"])
def test_cache_daytimeinterval_input_columnar(spark_tmp_path, alongside_gen,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this test method so complicated? What are we doing in this test that we aren't doing in any other test? We have a DF that we cache and then pull it from the cache, compare the result from the CPU and GPU

In other words, why don't we just do the following?

def test_cache_daytimeinterval_input_columnar():
    def func(spark):
        df = two_col_df(spark, DayTimeIntervalGen(), alongside_gen)
        df.cache().count()
        return df.selectExpr("a") // or whatever the column name is

    assert_gpu_and_cpu_are_equal_collect(func, conf={YOUR CONF})

I could be missing something

Copy link
Collaborator Author

@firestarman firestarman Mar 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought since we implement all the paths (both CPU and GPU) in the PCBS, I think the output for all of the paths should be equal to the original data.
Comparing the GPU output to CPU output bases on the CPU output of the PCBS is reliable and equal to the Spark output.
Maybe it is over designed.

Copy link
Collaborator Author

@firestarman firestarman Mar 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading data from parquet and writing it back is to get a columnar input to be cached and then convert the cached batches to columnar batches.
I do the same as your suggestion in the test_cache_daytimeinterval_input_row test, it only checks the paths of conversion between internal rows and cached batches.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplified this test

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading/writing to parquet isn't necessary to write/read columnar cache.

If you look at InMemoryTableScanExec it calls the convertColumnarBatchToCachedBatch if the spark plan supports columnar input and if the serializer supports columnar input (which it always does here)

Reading cache columnar in PCBS depends on three variables, whether the conf spark.sql.inMemoryColumnarStorage.enableVectorizedReader is enabled and the plan has 100 or less columns and the plan's output is AtomicType or NullType

Copy link
Collaborator Author

@firestarman firestarman Mar 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to know. Just got home and updated it.
Merged the two tests into one and updated the test function as your suggestion above.

with_rapids_memoryscan, with_rapids_reader):
# 'alongside_gen' and 'with_rapids_memoryscan' are used to cover the two output paths
# (columnar or row) for columnar input.
# 'rapids_reader_on' is used to cover the path of copying spark columns to rapids columns.
tmp_data_path = spark_tmp_path + '/PARQUET_DATA'
# create the test data
with_cpu_session(
lambda spark: two_col_df(spark,
DayTimeIntervalGen(), alongside_gen).write.mode("overwrite").parquet(tmp_data_path))
# build the config, copied some of them from 'test_cache_columnar', but updated them
# according to the warning message:
# WARN SQLConf: The SQL config 'spark.sql.legacy.parquet.datetimeRebaseModeInWrite'
# has been deprecated in Spark v3.2 and may be removed in the future.
# Use 'spark.sql.parquet.datetimeRebaseModeInWrite' instead.
test_conf = {
# rapids-spark doesn't support LEGACY read for parquet, also set the int96 rebase mode
# values because LEGACY in databricks which will preclude this op from running on GPU.
'spark.sql.parquet.datetimeRebaseModeInWrite': 'CORRECTED',
'spark.sql.parquet.datetimeRebaseModeInRead' : 'CORRECTED',
'spark.sql.parquet.int96RebaseModeInWrite': 'CORRECTED',
'spark.sql.parquet.int96RebaseModeInRead' : 'CORRECTED',
'spark.rapids.sql.format.parquet.read.enabled': with_rapids_reader,
'spark.rapids.sql.exec.InMemoryTableScanExec': with_rapids_memoryscan,
"spark.sql.inMemoryColumnarStorage.enableVectorizedReader": "true"}
# try to let the whole cache process is columnar for CPU memory table scan. Since collecting
# the result will always lead to converting cached data to internal rows, not column batches.
def test_func(spark, out_path):
spark.read.parquet(tmp_data_path).cache()\
.selectExpr('a', 'b').write.parquet(spark_tmp_path + out_path),
with_cpu_session(lambda spark: test_func(spark, "/out_cpu"), test_conf)
with_gpu_session(lambda spark: test_func(spark, "/out_gpu"), test_conf)

# both CPU and GPU result should be equal to the original data
def assert_output(spark):
original = spark.read.parquet(tmp_data_path).collect()
cpu_out = spark.read.parquet(spark_tmp_path + "/out_cpu").collect()
gpu_out = spark.read.parquet(spark_tmp_path + "/out_gpu").collect()
assert_collection_equal_ignore_order(original, cpu_out)
assert_collection_equal_ignore_order(original, gpu_out)

with_cpu_session(assert_output)
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import ai.rapids.cudf.DType
import com.nvidia.spark.rapids.GpuRowToColumnConverter.TypeConverter

import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.vectorized.ColumnVector

object GpuTypeShims {

Expand Down Expand Up @@ -46,4 +47,19 @@ object GpuTypeShims {
* @return the cuDF type if the Shim supports
*/
def toRapidsOrNull(t: DataType): DType = null

/** Whether the Shim supports columnar copy for the given type */
def isColumnarCopySupportedForType(colType: DataType): Boolean = false

/**
* Copy a column for computing on GPU.
* Better to check if the type is supported first by calling 'isColumnarCopySupportedForType'
*/
def columnarCopy(cv: ColumnVector,
b: ai.rapids.cudf.HostColumnVector.ColumnBuilder, rows: Int): Unit = {
val t = cv.dataType()
throw new UnsupportedOperationException(s"Converting to GPU for $t is not supported yet")
}

def isParquetColumnarWriterSupportedForType(colType: DataType): Boolean = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ protected class ParquetCachedBatchSerializer extends GpuCachedBatchSerializer wi
case TimestampType | StringType | BooleanType | DateType | BinaryType |
DoubleType | FloatType | ByteType | IntegerType | LongType | ShortType => true
case _: DecimalType => true
case other if GpuTypeShims.isParquetColumnarWriterSupportedForType(other) => true
case _ => false
}
}
Expand Down Expand Up @@ -334,10 +335,14 @@ protected class ParquetCachedBatchSerializer extends GpuCachedBatchSerializer wi
isSchemaSupportedByCudf(schema)) {
def putOnGpuIfNeeded(batch: ColumnarBatch): ColumnarBatch = {
if (!batch.column(0).isInstanceOf[GpuColumnVector]) {
val s: StructType = structSchema
val gpuCB = new GpuColumnarBatchBuilder(s, batch.numRows()).build(batch.numRows())
batch.close()
gpuCB
// The input batch from CPU must NOT be closed, because the columns inside it
// will be reused, and Spark expects the producer to close its batches.
val numRows = batch.numRows()
val gcbBuilder = new GpuColumnarBatchBuilder(structSchema, numRows)
for (i <- 0 until batch.numCols()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can use the following to improve the performance:

var rowIndex = 0
while (rowIndex < batch.numRows()) {

    ......
    rowIndex += 1
}

A similar PR: #4770

Copy link
Collaborator Author

@firestarman firestarman Mar 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Number of rows is always quite large, so this change can improve some performance. However here is for columns, this suggestion will get little benfit for performance, since number of columns is usually small.

gcbBuilder.copyColumnar(batch.column(i), i, structSchema(i).nullable, numRows)
}
gcbBuilder.build(numRows)
} else {
batch
}
Expand Down Expand Up @@ -1038,7 +1043,7 @@ protected class ParquetCachedBatchSerializer extends GpuCachedBatchSerializer wi
if (!cbIter.hasNext) {
Iterator.empty
} else {
new CurrentBatchIterator(cbIter.next().asInstanceOf[ParquetCachedBatch])
new CurrentBatchIterator(cbIter.next().asInstanceOf[ParquetCachedBatch])
}
}

Expand Down Expand Up @@ -1434,6 +1439,9 @@ protected class ParquetCachedBatchSerializer extends GpuCachedBatchSerializer wi

ParquetWriteSupport.setSchema(requestedSchema, hadoopConf)

// From 3.3.0, Spark will check this filed ID config
ParquetFieldIdShims.setupParquetFieldIdWriteConfig(hadoopConf, sqlConf)

hadoopConf
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
package com.nvidia.spark.rapids.shims

import ai.rapids.cudf.DType
import com.nvidia.spark.rapids.ColumnarCopyHelper
import com.nvidia.spark.rapids.GpuRowToColumnConverter.{LongConverter, NotNullLongConverter, TypeConverter}

import org.apache.spark.sql.types.{DataType, DayTimeIntervalType}
import org.apache.spark.sql.vectorized.ColumnVector

/**
* Spark stores ANSI YearMonthIntervalType as int32 and ANSI DayTimeIntervalType as int64
Expand Down Expand Up @@ -93,4 +95,27 @@ object GpuTypeShims {
null
}
}

/** Whether the Shim supports columnar copy for the given type */
def isColumnarCopySupportedForType(colType: DataType): Boolean = colType match {
case DayTimeIntervalType(_, _) => true
case _ => false
}

/**
* Copy a column for computing on GPU.
* Better to check if the type is supported first by calling 'isColumnarCopySupportedForType'
*/
def columnarCopy(cv: ColumnVector,
b: ai.rapids.cudf.HostColumnVector.ColumnBuilder, rows: Int): Unit = cv.dataType() match {
case DayTimeIntervalType(_, _) =>
ColumnarCopyHelper.longCopy(cv, b, rows)
case t =>
throw new UnsupportedOperationException(s"Converting to GPU for $t is not supported yet")
}

def isParquetColumnarWriterSupportedForType(colType: DataType): Boolean = colType match {
case DayTimeIntervalType(_, _) => true
case _ => false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.nvidia.spark.rapids.shims

import com.nvidia.spark.InMemoryTableScanMeta
import com.nvidia.spark.rapids._
import org.apache.parquet.schema.MessageType

Expand All @@ -25,6 +26,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Coalesce, DynamicPruningExpression, Expression, MetadataAttribute, TimeAdd}
import org.apache.spark.sql.catalyst.json.rapids.shims.Spark33XFileOptionsShims
import org.apache.spark.sql.execution.{BaseSubqueryExec, CoalesceExec, FileSourceScanExec, InSubqueryExec, ProjectExec, ReusedSubqueryExec, SparkPlan, SubqueryBroadcastExec}
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.datasources.{DataSourceUtils, FilePartition, FileScanRDD, HadoopFsRelation, PartitionedFile}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters
Expand Down Expand Up @@ -261,6 +263,11 @@ trait Spark33XShims extends Spark33XFileOptionsShims {
wrapped.disableBucketedScan)(conf)
}
}),
GpuOverrides.exec[InMemoryTableScanExec](
"Implementation of InMemoryTableScanExec to use GPU accelerated Caching",
// NullType is actually supported
ExecChecks(TypeSig.commonCudfTypesWithNested + TypeSig.DAYTIME, TypeSig.all),
(scan, conf, p, r) => new InMemoryTableScanMeta(scan, conf, p, r)),
GpuOverrides.exec[ProjectExec](
"The backend for most select, withColumn and dropColumn statements",
ExecChecks(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.nio.ByteBuffer
import scala.collection.JavaConverters._
import scala.collection.mutable

import com.nvidia.spark.rapids.shims.{ShimUnaryExecNode, SparkShimImpl}
import com.nvidia.spark.rapids.shims.{GpuTypeShims, ShimUnaryExecNode, SparkShimImpl}
import org.apache.arrow.memory.ReferenceManager
import org.apache.arrow.vector.ValueVector

Expand Down Expand Up @@ -148,6 +148,8 @@ object HostColumnarToGpu extends Logging {
ColumnarCopyHelper.decimal128Copy(cv, b, rows, dt.precision, dt.scale)
}
}
case other if GpuTypeShims.isColumnarCopySupportedForType(other) =>
GpuTypeShims.columnarCopy(cv, b, rows)
case t =>
throw new UnsupportedOperationException(
s"Converting to GPU for $t is not currently supported")
Expand Down