Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CreateMap support for multiple key-value pairs #3251

Merged
merged 12 commits into from
Aug 25, 2021
1 change: 1 addition & 0 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ Name | Description | Default Value
<a name="sql.castStringToFloat.enabled"></a>spark.rapids.sql.castStringToFloat.enabled|When set to true, enables casting from strings to float types (float, double) on the GPU. Currently hex values aren't supported on the GPU. Also note that casting from string to float types on the GPU returns incorrect results when the string represents any number "1.7976931348623158E308" <= x < "1.7976931348623159E308" and "-1.7976931348623158E308" >= x > "-1.7976931348623159E308" in both these cases the GPU returns Double.MaxValue while CPU returns "+Infinity" and "-Infinity" respectively|false
<a name="sql.castStringToTimestamp.enabled"></a>spark.rapids.sql.castStringToTimestamp.enabled|When set to true, casting from string to timestamp is supported on the GPU. The GPU only supports a subset of formats when casting strings to timestamps. Refer to the CAST documentation for more details.|false
<a name="sql.concurrentGpuTasks"></a>spark.rapids.sql.concurrentGpuTasks|Set the number of tasks that can execute concurrently per GPU. Tasks may temporarily block when the number of concurrent tasks in the executor exceeds this amount. Allowing too many concurrent tasks on the same GPU may lead to GPU out of memory errors.|1
<a name="sql.createMap.enabled"></a>spark.rapids.sql.createMap.enabled|When set to true, support the CreateMap expression on the GPU with multiple key-value pairs where the keys are not literal values. The GPU version does not detect duplicate keys or make any guarantees about which key wins if there are duplicates in this case. CreateMap is always supported on the GPU when there is a single key-value pair or when there are multiple key-value pairs with literal keys.|false
<a name="sql.csv.read.bool.enabled"></a>spark.rapids.sql.csv.read.bool.enabled|Parsing an invalid CSV boolean value produces true instead of null|false
<a name="sql.csv.read.byte.enabled"></a>spark.rapids.sql.csv.read.byte.enabled|Parsing CSV bytes is much more lenient and will return 0 for some malformed values instead of null|false
<a name="sql.csv.read.date.enabled"></a>spark.rapids.sql.csv.read.date.enabled|Parsing invalid CSV dates produces different results from Spark|false
Expand Down
2 changes: 1 addition & 1 deletion docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -4439,7 +4439,7 @@ Accelerator support is described below.
<td> </td>
<td> </td>
<td><em>PS<br/>max nested DECIMAL precision of 18;<br/>UTC is only supported TZ for nested TIMESTAMP</em></td>
<td> </td>
<td><em>PS<br/>max nested DECIMAL precision of 18;<br/>UTC is only supported TZ for nested TIMESTAMP</em></td>
<td><em>PS<br/>max nested DECIMAL precision of 18;<br/>UTC is only supported TZ for nested TIMESTAMP</em></td>
<td> </td>
</tr>
Expand Down
34 changes: 30 additions & 4 deletions integration_tests/src/main/python/map_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,47 @@ def test_simple_get_map_value(data_gen):
'a["key_5"]'))

@pytest.mark.parametrize('key_gen', [StringGen(nullable=False), IntegerGen(nullable=False), basic_struct_gen], ids=idfn)
@pytest.mark.parametrize('value_gen', [StringGen(nullable=False), IntegerGen(nullable=False), basic_struct_gen], ids=idfn)
@pytest.mark.parametrize('value_gen', [StringGen(nullable=True), IntegerGen(nullable=True), basic_struct_gen], ids=idfn)
def test_single_entry_map(key_gen, value_gen):
data_gen = [('a', key_gen), ('b', value_gen)]
assert_gpu_and_cpu_are_equal_collect(
lambda spark : gen_df(spark, data_gen).selectExpr(
'map("literal_key", b) as map1',
'map(a, b) as map2'))

@allow_non_gpu('ProjectExec,Alias,CreateMap')
# until https://github.com/NVIDIA/spark-rapids/issues/3229 is implemented
def test_map_expr_no_pairs():
data_gen = [('a', StringGen(nullable=False)), ('b', StringGen(nullable=False))]
assert_gpu_and_cpu_are_equal_collect(
lambda spark : gen_df(spark, data_gen).selectExpr(
'map() as m1'))

def test_map_expr_multiple_pairs():
# we don't hit duplicate keys in this test due to the high cardinality of the generated strings
data_gen = [('a', StringGen(nullable=False)), ('b', StringGen(nullable=False))]
revans2 marked this conversation as resolved.
Show resolved Hide resolved
assert_gpu_and_cpu_are_equal_collect(
lambda spark : gen_df(spark, data_gen).selectExpr(
'map("key1", b, "key2", a) as m1',
'map(a, b, b, a) as m2'),
conf={'spark.rapids.sql.createMap.enabled':True})

@allow_non_gpu('ProjectExec,Alias,CreateMap,Literal')
def test_map_expr_dupe_keys_fallback():
data_gen = [('a', StringGen(nullable=False)), ('b', StringGen(nullable=False))]
assert_gpu_fallback_collect(
lambda spark : gen_df(spark, data_gen).selectExpr(
'map("key1", b, "key1", a) as m1'),
'ProjectExec',
conf={'spark.rapids.sql.createMap.enabled':True,
'spark.sql.mapKeyDedupPolicy':'LAST_WIN'})

@allow_non_gpu('ProjectExec,Alias,CreateMap,Literal')
def test_map_expr_multi_non_literal_keys_fallback():
data_gen = [('a', StringGen(nullable=False)), ('b', StringGen(nullable=False))]
assert_gpu_fallback_collect(
lambda spark : gen_df(spark, data_gen).selectExpr(
"map(a, b, b, a) as m1"), 'ProjectExec')
'map(a, b, b, a) as m1'),
'ProjectExec',
conf={'spark.rapids.sql.createMap.enabled':False})

def test_map_scalar_project():
assert_gpu_and_cpu_are_equal_collect(
Expand Down
11 changes: 11 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,15 @@ object RapidsConf {
.booleanConf
.createWithDefault(false)

val ENABLE_CREATE_MAP = conf("spark.rapids.sql.createMap.enabled")
.doc("When set to true, support the CreateMap expression on the GPU with multiple " +
revans2 marked this conversation as resolved.
Show resolved Hide resolved
"key-value pairs where the keys are not literal values. The GPU version does not detect " +
"duplicate keys or make any guarantees about which key wins if there are duplicates in " +
"this case. CreateMap is always supported on the GPU when there is a single key-value " +
"pair or when there are multiple key-value pairs with literal keys.")
.booleanConf
.createWithDefault(false)

val ENABLE_INNER_JOIN = conf("spark.rapids.sql.join.inner.enabled")
.doc("When set to true inner joins are enabled on the GPU")
.booleanConf
Expand Down Expand Up @@ -1539,6 +1548,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val isProjectAstEnabled: Boolean = get(ENABLE_PROJECT_AST)

lazy val isCreateMapEnabled: Boolean = get(ENABLE_CREATE_MAP)

lazy val isParquetEnabled: Boolean = get(ENABLE_PARQUET)

lazy val isParquetInt96WriteEnabled: Boolean = get(ENABLE_PARQUET_INT96_WRITE)
Expand Down
37 changes: 28 additions & 9 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import java.io.{File, FileOutputStream}
import java.time.ZoneId

import ai.rapids.cudf.DType
import scala.collection.mutable

import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, UnaryExpression, WindowSpecDefinition}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Literal, UnaryExpression, WindowSpecDefinition}
import org.apache.spark.sql.types._


/**
* The level of support that the plugin has for a given type. Used for documentation generation.
*/
Expand Down Expand Up @@ -1025,25 +1027,42 @@ object WindowSpecCheck extends ExprChecks {

object CreateMapCheck extends ExprChecks {

// Spark supports all types except for Map for key and value (Map is not supported
// Spark supports all types except for Map for key (Map is not supported
// even in nested types)
val keyValueSig: TypeSig = (TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_64 +
private val keySig: TypeSig = (TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_64 +
TypeSig.ARRAY + TypeSig.STRUCT).nested()

private val valueSig: TypeSig = (TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_64 +
TypeSig.ARRAY + TypeSig.MAP + TypeSig.STRUCT).nested()

override def tag(meta: RapidsMeta[_, _, _]): Unit = {
if (meta.childExprs.length != 2) {
// See https://github.com/NVIDIA/spark-rapids/issues/3229
meta.willNotWorkOnGpu("CreateMap only supports two expressions on GPU")
// if there are more than two key-value pairs then there is the possibility of duplicate keys
if (meta.childExprs.length > 2) {
// check for duplicate keys if the keys are literal values
val keyExprs = meta.childExprs.indices.filter(_ % 2 == 0).map(meta.childExprs)
if (keyExprs.forall(_.wrapped.isInstanceOf[Literal])) {
val keys = keyExprs.map(_.wrapped.asInstanceOf[Literal].value)
val uniqueKeys = new mutable.HashSet[Any]()
for (key <- keys) {
if (!uniqueKeys.add(key)) {
meta.willNotWorkOnGpu("CreateMap with duplicate literal keys is not supported")
}
}
} else if (!meta.conf.isCreateMapEnabled) {
meta.willNotWorkOnGpu("CreateMap is not enabled by default when there are " +
"multiple key-value pairs and where the keys are not literal values because handling " +
"of duplicate keys is not compatible with Spark. " +
s"Set ${RapidsConf.ENABLE_CREATE_MAP}=true to enable it anyway.")
}
}
}

override def support(
dataType: TypeEnum.Value): Map[ExpressionContext, Map[String, SupportLevel]] = {
val support = keyValueSig.getSupportLevel(dataType, keyValueSig)
Map((ProjectExprContext,
Map(
("key", support),
("value", support))))
("key", keySig.getSupportLevel(dataType, keySig)),
("value", valueSig.getSupportLevel(dataType, valueSig)))))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.spark.sql.rapids

import ai.rapids.cudf.{ColumnVector, DType}
import com.nvidia.spark.rapids.{GpuColumnVector, GpuExpression, GpuExpressionsUtils}
import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingSeq

import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, TypeCoercion}
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FUNC_ALIAS
Expand Down Expand Up @@ -84,11 +85,11 @@ case class GpuCreateArray(children: Seq[Expression], useStringTypeWhenEmpty: Boo
case class GpuCreateMap(children: Seq[Expression], useStringTypeWhenEmpty: Boolean)
extends GpuExpression {

// See https://github.com/NVIDIA/spark-rapids/issues/3229
require(children.length == 2)
private val valueIndices: Seq[Int] = children.indices.filter(_ % 2 != 0)
private val keyIndices: Seq[Int] = children.indices.filter(_ % 2 == 0)

lazy val keys = children.indices.filter(_ % 2 == 0).map(children)
lazy val values = children.indices.filter(_ % 2 != 0).map(children)
lazy val keys: Seq[Expression] = keyIndices.map(children)
lazy val values: Seq[Expression] = valueIndices.map(children)

private val defaultElementType: DataType = {
if (useStringTypeWhenEmpty) {
Expand All @@ -104,8 +105,10 @@ case class GpuCreateMap(children: Seq[Expression], useStringTypeWhenEmpty: Boole
children.indices.foreach { index =>
columns(index) = GpuExpressionsUtils.columnarEvalToColumn(children(index), batch).getBase
}
withResource(ColumnVector.makeStruct(columns: _*)) { struct =>
GpuColumnVector.from(ColumnVector.makeList(numRows, DType.STRUCT, struct), dataType)
val structs = Range(0, columns.length, 2)
.safeMap(i => ColumnVector.makeStruct(columns(i), columns(i + 1)))
withResource(structs) { _ =>
GpuColumnVector.from(ColumnVector.makeList(numRows, DType.STRUCT, structs: _*), dataType)
}
}
}
Expand Down