diff --git a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/SparkPlanExecApiImpl.scala b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/SparkPlanExecApiImpl.scala index 713befb84883..2ab61f271043 100644 --- a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/SparkPlanExecApiImpl.scala +++ b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/SparkPlanExecApiImpl.scala @@ -136,6 +136,14 @@ class SparkPlanExecApiImpl extends SparkPlanExecApi { GenericExpressionTransformer(substraitExprName, Seq(child), expr) } + /** Transform inline to Substrait. */ + override def genInlineTransformer( + substraitExprName: String, + child: ExpressionTransformer, + expr: Expression): ExpressionTransformer = { + GenericExpressionTransformer(substraitExprName, Seq(child), expr) + } + /** * * Plans. */ diff --git a/backends-velox/src/test/scala/io/glutenproject/execution/TestOperator.scala b/backends-velox/src/test/scala/io/glutenproject/execution/TestOperator.scala index 831a6014f0ba..f2b6fe8b3187 100644 --- a/backends-velox/src/test/scala/io/glutenproject/execution/TestOperator.scala +++ b/backends-velox/src/test/scala/io/glutenproject/execution/TestOperator.scala @@ -732,6 +732,30 @@ class TestOperator extends VeloxWholeStageTransformerSuite { } } + test("test inline function") { + + withTempView("t1") { + sql("""select * from values + | array( + | named_struct('c1', 0, 'c2', 1), + | null, + | named_struct('c1', 2, 'c2', 3) + | ), + | array( + | null, + | named_struct('c1', 0, 'c2', 1), + | named_struct('c1', 2, 'c2', 3) + | ) + |as tbl(a) + """.stripMargin).createOrReplaceTempView("t1") + runQueryAndCompare(""" + |SELECT inline(a) from t1; + |""".stripMargin) { + checkOperatorMatch[GenerateExecTransformer] + } + } + } + test("test array functions") { withTable("t") { sql("CREATE TABLE t (c1 ARRAY, c2 ARRAY, c3 STRING) using parquet") diff --git a/cpp/velox/substrait/SubstraitToVeloxExpr.cc b/cpp/velox/substrait/SubstraitToVeloxExpr.cc index efd9358d9c90..4071c1b0111b 100644 --- a/cpp/velox/substrait/SubstraitToVeloxExpr.cc +++ b/cpp/velox/substrait/SubstraitToVeloxExpr.cc @@ -580,7 +580,6 @@ core::TypedExprPtr SubstraitVeloxExprConverter::toVeloxExpr( core::TypedExprPtr SubstraitVeloxExprConverter::toVeloxExpr( const ::substrait::Expression& substraitExpr, const RowTypePtr& inputType) { - core::TypedExprPtr veloxExpr; auto typeCase = substraitExpr.rex_type_case(); switch (typeCase) { case ::substrait::Expression::RexTypeCase::kLiteral: diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index e0a2c66f422c..4f0bdfa25455 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -724,8 +724,8 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: replicated.reserve(requiredChildOutput.size()); for (const auto& output : requiredChildOutput) { auto expression = exprConverter_->toVeloxExpr(output, inputType); - auto expr_field = dynamic_cast(expression.get()); - VELOX_CHECK(expr_field != nullptr, " the output in Generate Operator only support field") + auto exprField = dynamic_cast(expression.get()); + VELOX_CHECK(exprField != nullptr, " the output in Generate Operator only support field") replicated.emplace_back(std::dynamic_pointer_cast(expression)); } diff --git a/gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala b/gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala index b904f86998e8..f34e784b3866 100644 --- a/gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala +++ b/gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala @@ -207,6 +207,14 @@ trait SparkPlanExecApi { throw new UnsupportedOperationException("map_entries is not supported") } + /** Transform inline to Substrait. */ + def genInlineTransformer( + substraitExprName: String, + child: ExpressionTransformer, + expr: Expression): ExpressionTransformer = { + throw new UnsupportedOperationException("map_entries is not supported") + } + /** * Generate ShuffleDependency for ColumnarShuffleExchangeExec. * diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/GenerateExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/GenerateExecTransformer.scala index b6ef5f7c956f..29e13bd7b248 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/GenerateExecTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/GenerateExecTransformer.scala @@ -120,7 +120,9 @@ case class GenerateExecTransformer( .asJava projectExpressions.addAll(childOutputNodes) val projectExprNode = ExpressionConverter - .replaceWithExpressionTransformer(generator.asInstanceOf[Explode].child, child.output) + .replaceWithExpressionTransformer( + generator.asInstanceOf[UnaryExpression].child, + child.output) .doTransform(args) projectExpressions.add(projectExprNode) @@ -152,11 +154,11 @@ case class GenerateExecTransformer( operatorId: Long, inputAttributes: Seq[Attribute], input: RelNode, - generator: ExpressionNode, + generatorNode: ExpressionNode, childOutput: JList[ExpressionNode], validation: Boolean): RelNode = { - if (!validation) { - RelBuilder.makeGenerateRel(input, generator, childOutput, context, operatorId) + val generateRel = if (!validation) { + RelBuilder.makeGenerateRel(input, generatorNode, childOutput, context, operatorId) } else { // Use a extension node to send the input types through Substrait plan for validation. val inputTypeNodeList = @@ -164,7 +166,55 @@ case class GenerateExecTransformer( val extensionNode = ExtensionBuilder.makeAdvancedExtension( BackendsApiManager.getTransformerApiInstance.packPBMessage( TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf)) - RelBuilder.makeGenerateRel(input, generator, childOutput, extensionNode, context, operatorId) + RelBuilder.makeGenerateRel( + input, + generatorNode, + childOutput, + extensionNode, + context, + operatorId) + } + applyPostProjectOnGenerator(generateRel, context, operatorId, childOutput, validation) + } + + // There are 3 types of CollectionGenerator in spark: Explode, PosExplode and Inline. + // Only Inline needs the post projection. + private def applyPostProjectOnGenerator( + generateRel: RelNode, + context: SubstraitContext, + operatorId: Long, + childOutput: JList[ExpressionNode], + validation: Boolean): RelNode = { + generator match { + case Inline(inlineChild) => + inlineChild match { + case _: AttributeReference => + case _ => + throw new UnsupportedOperationException("Child of Inline is not AttributeReference.") + } + val requiredOutput = (0 until childOutput.size).map { + ExpressionBuilder.makeSelection(_) + } + val flattenStruct: Seq[ExpressionNode] = generatorOutput.indices.map { + i => + val selectionNode = ExpressionBuilder.makeSelection(requiredOutput.size) + selectionNode.addNestedChildIdx(i) + } + val postProjectRel = RelBuilder.makeProjectRel( + generateRel, + (requiredOutput ++ flattenStruct).asJava, + context, + operatorId, + 1 + requiredOutput.size // 1 stands for the inner struct field from array. + ) + if (validation) { + // No need to validate the project rel on the native side as + // it only flattens the generator's output. + generateRel + } else { + postProjectRel + } + case _ => generateRel } } diff --git a/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionConverter.scala b/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionConverter.scala index 5a74aae9cc0a..5c994bdc0a28 100644 --- a/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionConverter.scala +++ b/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionConverter.scala @@ -173,6 +173,11 @@ object ExpressionConverter extends SQLConfHelper with Logging { replaceWithExpressionTransformerInternal(p.child, attributeSeq, expressionsMap), p, attributeSeq) + case i: Inline => + BackendsApiManager.getSparkPlanExecApiInstance.genInlineTransformer( + substraitExprName, + replaceWithExpressionTransformerInternal(i.child, attributeSeq, expressionsMap), + i) case a: Alias => BackendsApiManager.getSparkPlanExecApiInstance.genAliasTransformer( substraitExprName, diff --git a/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionMappings.scala b/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionMappings.scala index 117a27d49479..b6a874756ed2 100644 --- a/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionMappings.scala +++ b/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionMappings.scala @@ -196,6 +196,7 @@ object ExpressionMappings { Sig[Sequence](SEQUENCE), Sig[CreateArray](CREATE_ARRAY), Sig[Explode](EXPLODE), + Sig[Inline](INLINE), Sig[ArrayAggregate](AGGREGATE), Sig[LambdaFunction](LAMBDAFUNCTION), Sig[NamedLambdaVariable](NAMED_LAMBDA_VARIABLE), diff --git a/shims/common/src/main/scala/io/glutenproject/expression/ExpressionNames.scala b/shims/common/src/main/scala/io/glutenproject/expression/ExpressionNames.scala index 0839c6d84395..c688c8abd43d 100644 --- a/shims/common/src/main/scala/io/glutenproject/expression/ExpressionNames.scala +++ b/shims/common/src/main/scala/io/glutenproject/expression/ExpressionNames.scala @@ -255,6 +255,7 @@ object ExpressionNames { final val AGGREGATE = "aggregate" final val LAMBDAFUNCTION = "lambdafunction" final val EXPLODE = "explode" + final val INLINE = "inline" final val POSEXPLODE = "posexplode" final val CHECK_OVERFLOW = "check_overflow" final val MAKE_DECIMAL = "make_decimal"