Skip to content

Commit

Permalink
Add in support for explode on maps (#3175)
Browse files Browse the repository at this point in the history
Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
  • Loading branch information
revans2 authored Aug 10, 2021
1 parent 2891e4a commit 92e0ecf
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 54 deletions.
4 changes: 2 additions & 2 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -6218,7 +6218,7 @@ Accelerator support is described below.
<td> </td>
<td> </td>
<td><em>PS<br/>max nested DECIMAL precision of 18;<br/>UTC is only supported TZ for nested TIMESTAMP;<br/>missing nested BINARY, CALENDAR, UDT</em></td>
<td><b>NS</b></td>
<td><em>PS<br/>max nested DECIMAL precision of 18;<br/>UTC is only supported TZ for nested TIMESTAMP;<br/>missing nested BINARY, CALENDAR, UDT</em></td>
<td> </td>
<td> </td>
</tr>
Expand Down Expand Up @@ -11698,7 +11698,7 @@ Accelerator support is described below.
<td> </td>
<td> </td>
<td><em>PS<br/>max nested DECIMAL precision of 18;<br/>UTC is only supported TZ for nested TIMESTAMP;<br/>missing nested BINARY, CALENDAR, UDT</em></td>
<td><b>NS</b></td>
<td><em>PS<br/>max nested DECIMAL precision of 18;<br/>UTC is only supported TZ for nested TIMESTAMP;<br/>missing nested BINARY, CALENDAR, UDT</em></td>
<td> </td>
<td> </td>
</tr>
Expand Down
12 changes: 9 additions & 3 deletions integration_tests/src/main/python/asserts.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ def _assert_equal(cpu, gpu, float_check, path):
assert len(cpu) == len(gpu), "CPU and GPU list have different lengths at {} CPU: {} GPU: {}".format(path, len(cpu), len(gpu))
for index in range(len(cpu)):
_assert_equal(cpu[index], gpu[index], float_check, path + [index])
elif (t is tuple):
assert len(cpu) == len(gpu), "CPU and GPU list have different lengths at {} CPU: {} GPU: {}".format(path, len(cpu), len(gpu))
for index in range(len(cpu)):
_assert_equal(cpu[index], gpu[index], float_check, path + [index])
elif (t is pytypes.GeneratorType):
index = 0
# generator has no zip :( so we have to do this the hard way
Expand All @@ -64,9 +68,11 @@ def _assert_equal(cpu, gpu, float_check, path):

index = index + 1
elif (t is dict):
# TODO eventually we need to split this up so we can do the right thing for float/double
# values stored under the map some where, especially for NaNs
assert cpu == gpu, "GPU and CPU map values are different at {}".format(path)
# The order of key/values is not guaranteed in python dicts, nor are they guaranteed by Spark
# so sort the items to do our best with ignoring the order of dicts
cpu_items = list(cpu.items()).sort(key=_RowCmp)
gpu_items = list(gpu.items()).sort(key=_RowCmp)
_assert_equal(cpu_items, gpu_items, float_check, path + ["map"])
elif (t is int):
assert cpu == gpu, "GPU and CPU int values are different at {}".format(path)
elif (t is float):
Expand Down
6 changes: 3 additions & 3 deletions integration_tests/src/main/python/data_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -901,11 +901,11 @@ def gen_scalars_for_sql(data_gen, count, seed=0, force_no_nulls=False):
simple_string_to_string_map_gen = MapGen(StringGen(pattern='key_[0-9]', nullable=False),
StringGen(), max_length=10)

all_basic_map_gens = [MapGen(f(nullable=False), f()) for f in [BooleanGen, ByteGen, ShortGen, IntegerGen, LongGen, FloatGen, DoubleGen, DateGen, TimestampGen]] + [simple_string_to_string_map_gen]

# Some map gens, but not all because of nesting
map_gens_sample = [simple_string_to_string_map_gen,
MapGen(StringGen(pattern='key_[0-9]', nullable=False), ArrayGen(string_gen), max_length=10),
map_gens_sample = all_basic_map_gens + [MapGen(StringGen(pattern='key_[0-9]', nullable=False), ArrayGen(string_gen), max_length=10),
MapGen(RepeatSeqGen(IntegerGen(nullable=False), 10), long_gen, max_length=10),
MapGen(BooleanGen(nullable=False), boolean_gen, max_length=2),
MapGen(StringGen(pattern='key_[0-9]', nullable=False), simple_string_to_string_map_gen)]

allow_negative_scale_of_decimal_conf = {'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true'}
Expand Down
40 changes: 40 additions & 0 deletions integration_tests/src/main/python/generate_expr_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ def test_explode_array_data(spark_tmp_path, data_gen):
lambda spark: two_col_df(spark, *data_gen).selectExpr('a', 'explode(b)'),
conf=conf_to_enforce_split_input)

#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.parametrize('map_gen', map_gens_sample, ids=idfn)
def test_explode_map_data(spark_tmp_path, map_gen):
data_gen = [int_gen, map_gen]
assert_gpu_and_cpu_are_equal_collect(
lambda spark: two_col_df(spark, *data_gen).selectExpr('a', 'explode(b)'),
conf=conf_to_enforce_split_input)

#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
Expand All @@ -78,6 +88,16 @@ def test_explode_outer_array_data(spark_tmp_path, data_gen):
lambda spark: two_col_df(spark, *data_gen).selectExpr('a', 'explode_outer(b)'),
conf=conf_to_enforce_split_input)

#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.parametrize('map_gen', map_gens_sample, ids=idfn)
def test_explode_outer_map_data(spark_tmp_path, map_gen):
data_gen = [int_gen, map_gen]
assert_gpu_and_cpu_are_equal_collect(
lambda spark: two_col_df(spark, *data_gen).selectExpr('a', 'explode_outer(b)'),
conf=conf_to_enforce_split_input)

#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
Expand Down Expand Up @@ -117,6 +137,16 @@ def test_posexplode_array_data(spark_tmp_path, data_gen):
lambda spark: two_col_df(spark, *data_gen).selectExpr('a', 'posexplode(b)'),
conf=conf_to_enforce_split_input)

#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.parametrize('map_gen', map_gens_sample, ids=idfn)
def test_posexplode_map_data(spark_tmp_path, map_gen):
data_gen = [int_gen, map_gen]
assert_gpu_and_cpu_are_equal_collect(
lambda spark: two_col_df(spark, *data_gen).selectExpr('a', 'posexplode(b)'),
conf=conf_to_enforce_split_input)

#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
Expand All @@ -138,6 +168,16 @@ def test_posexplode_outer_array_data(spark_tmp_path, data_gen):
lambda spark: two_col_df(spark, *data_gen).selectExpr('a', 'posexplode_outer(b)'),
conf=conf_to_enforce_split_input)

#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.parametrize('map_gen', map_gens_sample, ids=idfn)
def test_posexplode_outer_map_data(spark_tmp_path, map_gen):
data_gen = [int_gen, map_gen]
assert_gpu_and_cpu_are_equal_collect(
lambda spark: two_col_df(spark, *data_gen).selectExpr('a', 'posexplode_outer(b)'),
conf=conf_to_enforce_split_input)

#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,10 @@ static boolean typeConversionAllowed(ColumnView cv, DataType colType) {

static boolean typeConversionAllowed(Table table, DataType[] colTypes, int startCol, int endCol) {
final int numColumns = endCol - startCol;
assert numColumns == colTypes.length: "The number of columns and the number of types don't match";
assert numColumns == colTypes.length: "The number of columns and the number of types don't " +
"match. Expected " + colTypes.length + " but found " + numColumns + ". (" + table +
" columns " + startCol + " - " + endCol + " vs " +
Arrays.toString(colTypes) + ")";
boolean ret = true;
for (int colIndex = startCol; colIndex < endCol; colIndex++) {
boolean t = typeConversionAllowed(table.getColumn(colIndex), colTypes[colIndex - startCol]);
Expand Down
144 changes: 103 additions & 41 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import ai.rapids.cudf.{ColumnVector, ContiguousTable, NvtxColor, Table}
import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, CreateArray, Expression, Generator}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression, Generator}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{GenerateExec, SparkPlan, UnaryExecNode}
import org.apache.spark.sql.rapids.GpuCreateArray
Expand Down Expand Up @@ -228,24 +228,121 @@ abstract class GpuExplodeBase extends GpuUnevaluableUnaryExpression with GpuGene

// Infer result schema of GenerateExec from input schema
protected def resultSchema(inputSchema: Array[DataType],
genOffset: Int,
includePos: Boolean = false): Array[DataType] = {
genOffset: Int): Array[DataType] = {
val outputSchema = ArrayBuffer[DataType]()
inputSchema.zipWithIndex.foreach {
// extract output type of explode from input ArrayData
case (dataType, index) if index == genOffset =>
require(dataType.isInstanceOf[ArrayType], "GpuExplode only supports ArrayData now")
if (includePos) {
if (position) {
outputSchema += IntegerType
}
outputSchema += dataType.asInstanceOf[ArrayType].elementType
dataType match {
case ArrayType(elementType, _) =>
outputSchema += elementType
case MapType(keyType, valueType, _) =>
outputSchema += keyType
outputSchema += valueType
}
// map types of other required columns
case (dataType, _) =>
outputSchema += dataType
}
outputSchema.toArray
}

/**
* A function that will do the explode or position explode
*/
private[this] def explodeFun(inputTable: Table, genOffset: Int, outer: Boolean): Table = {
if (position) {
if (outer) {
inputTable.explodeOuterPosition(genOffset)
} else {
inputTable.explodePosition(genOffset)
}
} else {
if (outer) {
inputTable.explodeOuter(genOffset)
} else {
inputTable.explode(genOffset)
}
}
}

override def generate(inputBatch: ColumnarBatch,
generatorOffset: Int,
outer: Boolean): ColumnarBatch = {

require(inputBatch.numCols() - 1 == generatorOffset,
s"Internal Error ${getClass.getSimpleName} supports one and only one input attribute.")
val schema = resultSchema(GpuColumnVector.extractTypes(inputBatch), generatorOffset)

withResource(GpuColumnVector.from(inputBatch)) { table =>
withResource(explodeFun(table, generatorOffset, outer)) { exploded =>
child.dataType match {
case _: ArrayType =>
GpuColumnVector.from(exploded, schema)
case MapType(kt, vt, _) =>
// We need to pull the key and value of of the struct column
withResource(convertMapOutput(exploded, generatorOffset, kt, vt, outer)) { fixed =>
GpuColumnVector.from(fixed, schema)
}
case other =>
throw new IllegalArgumentException(
s"$other is not supported as explode input right now")
}
}
}
}

private[this] def convertMapOutput(exploded: Table,
genOffset: Int,
kt: DataType,
vt: DataType,
fixChildValidity: Boolean): Table = {
val numPos = if (position) 1 else 0
// scalastyle:off line.size.limit
// The input will look like the following, and we just want to expand the key, value in the
// struct into separate columns
// INDEX [0, genOffset)| genOffset | genOffset + numPos | [genOffset + numPos + 1, exploded.getNumberOfColumns)
// SOME INPUT COLUMNS | POS COLUMN? | STRUCT(KEY, VALUE) | MORE INPUT COLUMNS
// scalastyle:on line.size.limit
val structPos = genOffset + numPos
withResource(ArrayBuffer.empty[ColumnVector]) { newColumns =>
(0 until exploded.getNumberOfColumns).foreach { index =>
if (index == structPos) {
val kvStructCol = exploded.getColumn(index)
if (fixChildValidity) {
// TODO once explode outer is fixed remove the following workaround
// https://github.com/rapidsai/cudf/issues/9003
withResource(kvStructCol.isNull) { isNull =>
newColumns += withResource(kvStructCol.getChildColumnView(0)) { keyView =>
withResource(GpuScalar.from(null, kt)) { nullKey =>
isNull.ifElse(nullKey, keyView)
}
}
newColumns += withResource(kvStructCol.getChildColumnView(1)) { valueView =>
withResource(GpuScalar.from(null, vt)) { nullValue =>
isNull.ifElse(nullValue, valueView)
}
}
}
} else {
newColumns += withResource(kvStructCol.getChildColumnView(0)) { keyView =>
keyView.copyToColumnVector()
}
newColumns += withResource(kvStructCol.getChildColumnView(1)) { valueView =>
valueView.copyToColumnVector()
}
}
} else {
newColumns += exploded.getColumn(index).incRefCount()
}
}
new Table(newColumns: _*)
}
}

override def fixedLenLazyExpressions: Seq[Expression] = child match {
// GpuLiteral of ArrayData will be converted to GpuCreateArray with GpuLiterals
case GpuCreateArray(expressions, _) => expressions
Expand Down Expand Up @@ -332,45 +429,10 @@ abstract class GpuExplodeBase extends GpuUnevaluableUnaryExpression with GpuGene
}

case class GpuExplode(child: Expression) extends GpuExplodeBase {

override def generate(inputBatch: ColumnarBatch,
generatorOffset: Int,
outer: Boolean): ColumnarBatch = {

require(inputBatch.numCols() - 1 == generatorOffset,
"Internal Error GpuExplode supports one and only one input attribute.")
val schema = resultSchema(GpuColumnVector.extractTypes(inputBatch), generatorOffset)
val explodeFun = (t: Table) =>
if (outer) t.explodeOuter(generatorOffset) else t.explode(generatorOffset)
withResource(GpuColumnVector.from(inputBatch)) { table =>
withResource(explodeFun(table)) { exploded =>
GpuColumnVector.from(exploded, schema)
}
}
}

override val position: Boolean = false
}

case class GpuPosExplode(child: Expression) extends GpuExplodeBase {

override def generate(inputBatch: ColumnarBatch,
generatorOffset: Int,
outer: Boolean): ColumnarBatch = {

require(inputBatch.numCols() - 1 == generatorOffset,
"Internal Error GpuPosExplode supports one and only one input attribute.")
val schema = resultSchema(
GpuColumnVector.extractTypes(inputBatch), generatorOffset, includePos = true)
val explodePosFun = (t: Table) =>
if (outer) t.explodeOuterPosition(generatorOffset) else t.explodePosition(generatorOffset)
withResource(GpuColumnVector.from(inputBatch)) { table =>
withResource(explodePosFun(table)) { exploded =>
GpuColumnVector.from(exploded, schema)
}
}
}

override def position: Boolean = true
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2690,8 +2690,8 @@ object GpuOverrides {
TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_64 +
TypeSig.ARRAY + TypeSig.STRUCT + TypeSig.MAP),
TypeSig.ARRAY.nested(TypeSig.all),
TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_64 +
TypeSig.ARRAY + TypeSig.STRUCT + TypeSig.MAP),
(TypeSig.ARRAY + TypeSig.MAP).nested(TypeSig.commonCudfTypes + TypeSig.NULL +
TypeSig.DECIMAL_64 + TypeSig.ARRAY + TypeSig.STRUCT + TypeSig.MAP),
(TypeSig.ARRAY + TypeSig.MAP).nested(TypeSig.all)),
(a, conf, p, r) => new GeneratorExprMeta[Explode](a, conf, p, r) {
override val supportOuter: Boolean = true
Expand All @@ -2705,8 +2705,8 @@ object GpuOverrides {
TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_64 +
TypeSig.ARRAY + TypeSig.STRUCT + TypeSig.MAP),
TypeSig.ARRAY.nested(TypeSig.all),
TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_64 +
TypeSig.ARRAY + TypeSig.STRUCT + TypeSig.MAP),
(TypeSig.ARRAY + TypeSig.MAP).nested(TypeSig.commonCudfTypes + TypeSig.NULL +
TypeSig.DECIMAL_64 + TypeSig.ARRAY + TypeSig.STRUCT + TypeSig.MAP),
(TypeSig.ARRAY + TypeSig.MAP).nested(TypeSig.all)),
(a, conf, p, r) => new GeneratorExprMeta[PosExplode](a, conf, p, r) {
override val supportOuter: Boolean = true
Expand Down

0 comments on commit 92e0ecf

Please sign in to comment.