Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add in support for explode on maps #3175

Merged
merged 4 commits into from
Aug 10, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -6218,7 +6218,7 @@ Accelerator support is described below.
<td> </td>
<td> </td>
<td><em>PS<br/>max nested DECIMAL precision of 18;<br/>UTC is only supported TZ for nested TIMESTAMP;<br/>missing nested BINARY, CALENDAR, UDT</em></td>
<td><b>NS</b></td>
<td><em>PS<br/>max nested DECIMAL precision of 18;<br/>UTC is only supported TZ for nested TIMESTAMP;<br/>missing nested BINARY, CALENDAR, UDT</em></td>
<td> </td>
<td> </td>
</tr>
Expand Down Expand Up @@ -11698,7 +11698,7 @@ Accelerator support is described below.
<td> </td>
<td> </td>
<td><em>PS<br/>max nested DECIMAL precision of 18;<br/>UTC is only supported TZ for nested TIMESTAMP;<br/>missing nested BINARY, CALENDAR, UDT</em></td>
<td><b>NS</b></td>
<td><em>PS<br/>max nested DECIMAL precision of 18;<br/>UTC is only supported TZ for nested TIMESTAMP;<br/>missing nested BINARY, CALENDAR, UDT</em></td>
<td> </td>
<td> </td>
</tr>
Expand Down
12 changes: 9 additions & 3 deletions integration_tests/src/main/python/asserts.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ def _assert_equal(cpu, gpu, float_check, path):
assert len(cpu) == len(gpu), "CPU and GPU list have different lengths at {} CPU: {} GPU: {}".format(path, len(cpu), len(gpu))
for index in range(len(cpu)):
_assert_equal(cpu[index], gpu[index], float_check, path + [index])
elif (t is tuple):
assert len(cpu) == len(gpu), "CPU and GPU list have different lengths at {} CPU: {} GPU: {}".format(path, len(cpu), len(gpu))
for index in range(len(cpu)):
_assert_equal(cpu[index], gpu[index], float_check, path + [index])
elif (t is pytypes.GeneratorType):
index = 0
# generator has no zip :( so we have to do this the hard way
Expand All @@ -64,9 +68,11 @@ def _assert_equal(cpu, gpu, float_check, path):

index = index + 1
elif (t is dict):
# TODO eventually we need to split this up so we can do the right thing for float/double
# values stored under the map some where, especially for NaNs
assert cpu == gpu, "GPU and CPU map values are different at {}".format(path)
# The order of key/values is not guaranteed in python dicts, nor are they guaranteed by Spark
# so sort the items to do our best with ignoring the order of dicts
cpu_items = list(cpu.items()).sort(key=_RowCmp)
gpu_items = list(gpu.items()).sort(key=_RowCmp)
_assert_equal(cpu_items, gpu_items, float_check, path + ["map"])
elif (t is int):
assert cpu == gpu, "GPU and CPU int values are different at {}".format(path)
elif (t is float):
Expand Down
6 changes: 3 additions & 3 deletions integration_tests/src/main/python/data_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -901,11 +901,11 @@ def gen_scalars_for_sql(data_gen, count, seed=0, force_no_nulls=False):
simple_string_to_string_map_gen = MapGen(StringGen(pattern='key_[0-9]', nullable=False),
StringGen(), max_length=10)

all_basic_map_gens = [MapGen(f(nullable=False), f()) for f in [BooleanGen, ByteGen, ShortGen, IntegerGen, LongGen, FloatGen, DoubleGen, DateGen, TimestampGen]] + [simple_string_to_string_map_gen]

# Some map gens, but not all because of nesting
map_gens_sample = [simple_string_to_string_map_gen,
MapGen(StringGen(pattern='key_[0-9]', nullable=False), ArrayGen(string_gen), max_length=10),
map_gens_sample = all_basic_map_gens + [MapGen(StringGen(pattern='key_[0-9]', nullable=False), ArrayGen(string_gen), max_length=10),
MapGen(RepeatSeqGen(IntegerGen(nullable=False), 10), long_gen, max_length=10),
MapGen(BooleanGen(nullable=False), boolean_gen, max_length=2),
MapGen(StringGen(pattern='key_[0-9]', nullable=False), simple_string_to_string_map_gen)]

allow_negative_scale_of_decimal_conf = {'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true'}
Expand Down
40 changes: 40 additions & 0 deletions integration_tests/src/main/python/generate_expr_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ def test_explode_array_data(spark_tmp_path, data_gen):
lambda spark: two_col_df(spark, *data_gen).selectExpr('a', 'explode(b)'),
conf=conf_to_enforce_split_input)

#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.parametrize('map_gen', map_gens_sample, ids=idfn)
def test_explode_map_data(spark_tmp_path, map_gen):
data_gen = [int_gen, map_gen]
assert_gpu_and_cpu_are_equal_collect(
lambda spark: two_col_df(spark, *data_gen).selectExpr('a', 'explode(b)'),
conf=conf_to_enforce_split_input)

#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
Expand All @@ -78,6 +88,16 @@ def test_explode_outer_array_data(spark_tmp_path, data_gen):
lambda spark: two_col_df(spark, *data_gen).selectExpr('a', 'explode_outer(b)'),
conf=conf_to_enforce_split_input)

#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.parametrize('map_gen', map_gens_sample, ids=idfn)
def test_explode_outer_map_data(spark_tmp_path, map_gen):
data_gen = [int_gen, map_gen]
assert_gpu_and_cpu_are_equal_collect(
lambda spark: two_col_df(spark, *data_gen).selectExpr('a', 'explode_outer(b)'),
conf=conf_to_enforce_split_input)

#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
Expand Down Expand Up @@ -117,6 +137,16 @@ def test_posexplode_array_data(spark_tmp_path, data_gen):
lambda spark: two_col_df(spark, *data_gen).selectExpr('a', 'posexplode(b)'),
conf=conf_to_enforce_split_input)

#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.parametrize('map_gen', map_gens_sample, ids=idfn)
def test_posexplode_map_data(spark_tmp_path, map_gen):
data_gen = [int_gen, map_gen]
assert_gpu_and_cpu_are_equal_collect(
lambda spark: two_col_df(spark, *data_gen).selectExpr('a', 'posexplode(b)'),
conf=conf_to_enforce_split_input)

#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
Expand All @@ -138,6 +168,16 @@ def test_posexplode_outer_array_data(spark_tmp_path, data_gen):
lambda spark: two_col_df(spark, *data_gen).selectExpr('a', 'posexplode_outer(b)'),
conf=conf_to_enforce_split_input)

#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.parametrize('map_gen', map_gens_sample, ids=idfn)
def test_posexplode_outer_map_data(spark_tmp_path, map_gen):
data_gen = [int_gen, map_gen]
assert_gpu_and_cpu_are_equal_collect(
lambda spark: two_col_df(spark, *data_gen).selectExpr('a', 'posexplode_outer(b)'),
conf=conf_to_enforce_split_input)

#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,10 @@ static boolean typeConversionAllowed(ColumnView cv, DataType colType) {

static boolean typeConversionAllowed(Table table, DataType[] colTypes, int startCol, int endCol) {
final int numColumns = endCol - startCol;
assert numColumns == colTypes.length: "The number of columns and the number of types don't match";
assert numColumns == colTypes.length: "The number of columns and the number of types don't " +
"match. expected " + colTypes.length + " but found " + numColumns + ". (" + table +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"match. expected " + colTypes.length + " but found " + numColumns + ". (" + table +
"match. Expected " + colTypes.length + " but found " + numColumns + ". (" + table +

startCol + " - " + endCol + " vs " +
Arrays.toString(colTypes) + ")";
boolean ret = true;
for (int colIndex = startCol; colIndex < endCol; colIndex++) {
boolean t = typeConversionAllowed(table.getColumn(colIndex), colTypes[colIndex - startCol]);
Expand Down
144 changes: 103 additions & 41 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import ai.rapids.cudf.{ColumnVector, ContiguousTable, NvtxColor, Table}
import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, CreateArray, Expression, Generator}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression, Generator}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{GenerateExec, SparkPlan, UnaryExecNode}
import org.apache.spark.sql.rapids.GpuCreateArray
Expand Down Expand Up @@ -228,24 +228,121 @@ abstract class GpuExplodeBase extends GpuUnevaluableUnaryExpression with GpuGene

// Infer result schema of GenerateExec from input schema
protected def resultSchema(inputSchema: Array[DataType],
genOffset: Int,
includePos: Boolean = false): Array[DataType] = {
genOffset: Int): Array[DataType] = {
val outputSchema = ArrayBuffer[DataType]()
inputSchema.zipWithIndex.foreach {
// extract output type of explode from input ArrayData
case (dataType, index) if index == genOffset =>
require(dataType.isInstanceOf[ArrayType], "GpuExplode only supports ArrayData now")
if (includePos) {
if (position) {
outputSchema += IntegerType
}
outputSchema += dataType.asInstanceOf[ArrayType].elementType
dataType match {
case ArrayType(elementType, _) =>
outputSchema += elementType
case MapType(keyType, valueType, _) =>
outputSchema += keyType
outputSchema += valueType
}
// map types of other required columns
case (dataType, _) =>
outputSchema += dataType
}
outputSchema.toArray
}

/**
* A function that will do the explode or position explode
*/
private[this] def explodeFun(inputTable: Table, genOffset: Int, outer: Boolean): Table = {
if (position) {
if (outer) {
inputTable.explodeOuterPosition(genOffset)
} else {
inputTable.explodePosition(genOffset)
}
} else {
if (outer) {
inputTable.explodeOuter(genOffset)
} else {
inputTable.explode(genOffset)
}
}
}

override def generate(inputBatch: ColumnarBatch,
generatorOffset: Int,
outer: Boolean): ColumnarBatch = {

require(inputBatch.numCols() - 1 == generatorOffset,
s"Internal Error ${getClass.getSimpleName} supports one and only one input attribute.")
val schema = resultSchema(GpuColumnVector.extractTypes(inputBatch), generatorOffset)

withResource(GpuColumnVector.from(inputBatch)) { table =>
withResource(explodeFun(table, generatorOffset, outer)) { exploded =>
child.dataType match {
case _: ArrayType =>
GpuColumnVector.from(exploded, schema)
case MapType(kt, vt, _) =>
// We need to pull the key and value of of the struct column
withResource(convertMapOutput(exploded, generatorOffset, kt, vt, outer)) { fixed =>
GpuColumnVector.from(fixed, schema)
}
case other =>
throw new IllegalArgumentException(
s"$other is not supported as explode input right now")
}
}
}
}

private[this] def convertMapOutput(exploded: Table,
genOffset: Int,
kt: DataType,
vt: DataType,
fixChildValidity: Boolean): Table = {
val numPos = if (position) 1 else 0
// scalastyle:off line.size.limit
// The input will look like the following, and we just want to expand the key, value in the
// struct into separate columns
// INDEX [0, genOffset)| genOffset | genOffset + numPos | [genOffset + numPos + 1, exploded.getNumberOfColumns)
// SOME INPUT COLUMNS | POS COLUMN? | STRUCT(KEY, VALUE) | MORE INPUT COLUMNS
// scalastyle:on line.size.limit
val structPos = genOffset + numPos
withResource (ArrayBuffer.empty[ColumnVector] ) { newColumns =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: some odd extra whitespace here.

Suggested change
withResource (ArrayBuffer.empty[ColumnVector] ) { newColumns =>
withResource(ArrayBuffer.empty[ColumnVector]) { newColumns =>

(0 until exploded.getNumberOfColumns).foreach { index =>
if (index == structPos) {
val kvStructCol = exploded.getColumn(index)
if (fixChildValidity) {
// TODO once explode outer is fixed remove the following workaround
// https://github.com/rapidsai/cudf/issues/9003
withResource(kvStructCol.isNull) { isNull =>
newColumns += withResource(kvStructCol.getChildColumnView(0)) { keyView =>
withResource(GpuScalar.from(null, kt)) { nullKey =>
isNull.ifElse(nullKey, keyView)
}
}
newColumns += withResource(kvStructCol.getChildColumnView(1)) { valueView =>
withResource(GpuScalar.from(null, vt)) { nullValue =>
isNull.ifElse(nullValue, valueView)
}
}
}
} else {
newColumns += withResource(kvStructCol.getChildColumnView(0)) { keyView =>
keyView.copyToColumnVector()
}
newColumns += withResource(kvStructCol.getChildColumnView(1)) { valueView =>
valueView.copyToColumnVector()
}
}
} else {
newColumns += exploded.getColumn(index).incRefCount()
}
}
new Table(newColumns: _*)
}
}

override def fixedLenLazyExpressions: Seq[Expression] = child match {
// GpuLiteral of ArrayData will be converted to GpuCreateArray with GpuLiterals
case GpuCreateArray(expressions, _) => expressions
Expand Down Expand Up @@ -332,45 +429,10 @@ abstract class GpuExplodeBase extends GpuUnevaluableUnaryExpression with GpuGene
}

case class GpuExplode(child: Expression) extends GpuExplodeBase {

override def generate(inputBatch: ColumnarBatch,
generatorOffset: Int,
outer: Boolean): ColumnarBatch = {

require(inputBatch.numCols() - 1 == generatorOffset,
"Internal Error GpuExplode supports one and only one input attribute.")
val schema = resultSchema(GpuColumnVector.extractTypes(inputBatch), generatorOffset)
val explodeFun = (t: Table) =>
if (outer) t.explodeOuter(generatorOffset) else t.explode(generatorOffset)
withResource(GpuColumnVector.from(inputBatch)) { table =>
withResource(explodeFun(table)) { exploded =>
GpuColumnVector.from(exploded, schema)
}
}
}

override val position: Boolean = false
}

case class GpuPosExplode(child: Expression) extends GpuExplodeBase {

override def generate(inputBatch: ColumnarBatch,
generatorOffset: Int,
outer: Boolean): ColumnarBatch = {

require(inputBatch.numCols() - 1 == generatorOffset,
"Internal Error GpuPosExplode supports one and only one input attribute.")
val schema = resultSchema(
GpuColumnVector.extractTypes(inputBatch), generatorOffset, includePos = true)
val explodePosFun = (t: Table) =>
if (outer) t.explodeOuterPosition(generatorOffset) else t.explodePosition(generatorOffset)
withResource(GpuColumnVector.from(inputBatch)) { table =>
withResource(explodePosFun(table)) { exploded =>
GpuColumnVector.from(exploded, schema)
}
}
}

override def position: Boolean = true
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2679,8 +2679,8 @@ object GpuOverrides {
TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_64 +
TypeSig.ARRAY + TypeSig.STRUCT + TypeSig.MAP),
TypeSig.ARRAY.nested(TypeSig.all),
TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_64 +
TypeSig.ARRAY + TypeSig.STRUCT + TypeSig.MAP),
(TypeSig.ARRAY + TypeSig.MAP).nested(TypeSig.commonCudfTypes + TypeSig.NULL +
TypeSig.DECIMAL_64 + TypeSig.ARRAY + TypeSig.STRUCT + TypeSig.MAP),
(TypeSig.ARRAY + TypeSig.MAP).nested(TypeSig.all)),
(a, conf, p, r) => new GeneratorExprMeta[Explode](a, conf, p, r) {
override val supportOuter: Boolean = true
Expand All @@ -2694,8 +2694,8 @@ object GpuOverrides {
TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_64 +
TypeSig.ARRAY + TypeSig.STRUCT + TypeSig.MAP),
TypeSig.ARRAY.nested(TypeSig.all),
TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_64 +
TypeSig.ARRAY + TypeSig.STRUCT + TypeSig.MAP),
(TypeSig.ARRAY + TypeSig.MAP).nested(TypeSig.commonCudfTypes + TypeSig.NULL +
TypeSig.DECIMAL_64 + TypeSig.ARRAY + TypeSig.STRUCT + TypeSig.MAP),
(TypeSig.ARRAY + TypeSig.MAP).nested(TypeSig.all)),
(a, conf, p, r) => new GeneratorExprMeta[PosExplode](a, conf, p, r) {
override val supportOuter: Boolean = true
Expand Down