diff --git a/NOTICE-binary b/NOTICE-binary index 8066d8545c3..0c0021116d0 100644 --- a/NOTICE-binary +++ b/NOTICE-binary @@ -12,17 +12,6 @@ Copyright 2014 and onwards The Apache Software Foundation This product includes software developed at The Apache Software Foundation (http://www.apache.org/). ---------------------------------------------------------------------- - -Apache ORC -Copyright 2013-2019 The Apache Software Foundation - -This product includes software developed by The Apache Software -Foundation (http://www.apache.org/). - -This product includes software developed by Hewlett-Packard: -(c) Copyright [2014-2015] Hewlett-Packard Development Company, L.P - --------------------------------------------------------------------- UCF Consortium - Unified Communication X (UCX) diff --git a/aggregator/pom.xml b/aggregator/pom.xml index 4ea80017800..b4663bafa2c 100644 --- a/aggregator/pom.xml +++ b/aggregator/pom.xml @@ -88,53 +88,8 @@ - org.apache.orc. - ${rapids.shade.package}.orc. - - - org.apache.hadoop.hive. - ${rapids.shade.package}.hadoop.hive. - - - org.apache.hadoop.hive.conf.HiveConf - org.apache.hadoop.hive.ql.exec.FunctionRegistry - org.apache.hadoop.hive.ql.exec.UDF - org.apache.hadoop.hive.ql.exec.UDFMethodResolver - org.apache.hadoop.hive.ql.udf.UDFType - org.apache.hadoop.hive.ql.udf.generic.GenericUDF - org.apache.hadoop.hive.ql.udf.generic.GenericUDF$DeferredObject - org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils$ConversionHelper - org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector - org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory - org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory$ObjectInspectorOptions - org.apache.hadoop.hive.serde2.objectinspector.StructField - org.apache.hadoop.hive.serde2.typeinfo.TypeInfo - - - - org.apache.hive. - ${rapids.shade.package}.hive. - - - io.airlift.compress. - ${rapids.shade.package}.io.airlift.compress. - - - org.apache.commons.codec. - ${rapids.shade.package}.org.apache.commons.codec. - - - org.apache.commons.lang. - ${rapids.shade.package}.org.apache.commons.lang. - - - com.google - ${rapids.shade.package}.com.google + com.google.flatbuffers + ${rapids.shade.package}.com.google.flatbuffers diff --git a/common/pom.xml b/common/pom.xml new file mode 100644 index 00000000000..3f46ea8459f --- /dev/null +++ b/common/pom.xml @@ -0,0 +1,96 @@ + + + + + + 4.0.0 + + + com.nvidia + rapids-4-spark-parent + 22.04.0-SNAPSHOT + + + rapids-4-spark-common_2.12 + RAPIDS Accelerator for Apache Spark Common + Utility code that is common across the RAPIDS Accelerator projects + 22.04.0-SNAPSHOT + + + + org.scala-lang + scala-library + + + org.scalatest + scalatest_${scala.binary.version} + test + + + + + + + + ${project.build.directory}/extra-resources + true + + + ${project.basedir}/.. + META-INF + + + LICENSE + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + default-test-jar + none + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + true + + + + net.alchim31.maven + scala-maven-plugin + + + org.apache.rat + apache-rat-plugin + + + org.scalatest + scalatest-maven-plugin + + + + diff --git a/common/src/main/scala/com/nvidia/spark/rapids/CheckUtils.scala b/common/src/main/scala/com/nvidia/spark/rapids/CheckUtils.scala new file mode 100644 index 00000000000..65ab724cc50 --- /dev/null +++ b/common/src/main/scala/com/nvidia/spark/rapids/CheckUtils.scala @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids + +object CheckUtils { + def checkArgument(expression: Boolean, msg: String): Unit = { + if (!expression) throw new IllegalArgumentException(msg) + } +} diff --git a/common/src/main/scala/com/nvidia/spark/rapids/ThreadFactoryBuilder.scala b/common/src/main/scala/com/nvidia/spark/rapids/ThreadFactoryBuilder.scala new file mode 100644 index 00000000000..d61dd5a9c90 --- /dev/null +++ b/common/src/main/scala/com/nvidia/spark/rapids/ThreadFactoryBuilder.scala @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids + +import java.util.concurrent.{Executors, ThreadFactory} +import java.util.concurrent.atomic.AtomicLong + +// This is similar to Guava ThreadFactoryBuilder +// Avoid to use Guava as it is a messy dependency in practice. +class ThreadFactoryBuilder { + private var nameFormat = Option.empty[String] + private var daemon = Option.empty[Boolean] + + def setNameFormat(nameFormat: String): ThreadFactoryBuilder = { + nameFormat.format(0) + this.nameFormat = Some(nameFormat) + this + } + + def setDaemon(daemon: Boolean): ThreadFactoryBuilder = { + this.daemon = Some(daemon) + this + } + + def build(): ThreadFactory = { + val count = nameFormat.map(_ => new AtomicLong(0)) + new ThreadFactory() { + private val defaultThreadFactory = Executors.defaultThreadFactory + + override def newThread(r: Runnable): Thread = { + val thread = defaultThreadFactory.newThread(r) + nameFormat.foreach(f => thread.setName(f.format(count.get.getAndIncrement()))) + daemon.foreach(b => thread.setDaemon(b)) + thread + } + } + } +} diff --git a/common/src/test/scala/com/nvidia/spark/rapids/ThreadFactoryBuilderTest.scala b/common/src/test/scala/com/nvidia/spark/rapids/ThreadFactoryBuilderTest.scala new file mode 100644 index 00000000000..d71915f51d0 --- /dev/null +++ b/common/src/test/scala/com/nvidia/spark/rapids/ThreadFactoryBuilderTest.scala @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids + +import java.util.concurrent.{Callable, Executors} + +import org.scalatest.FunSuite + +class ThreadFactoryBuilderTest extends FunSuite { + + test("test thread factory builder") { + val pool1 = Executors.newFixedThreadPool(2, + new ThreadFactoryBuilder().setNameFormat("thread-pool1-1 %s").setDaemon(true).build()) + try { + var ret = pool1.submit(new Callable[String] { + override def call(): String = { + assert(Thread.currentThread().isDaemon) + assert(Thread.currentThread().getName == "thread-pool1-1 0") + "" + } + }) + // waits and retrieves the result, if above asserts failed, will get execution exception + ret.get() + ret = pool1.submit(() => { + assert(Thread.currentThread().isDaemon) + assert(Thread.currentThread().getName == "thread-pool1-1 1") + "" + }) + ret.get() + } finally { + pool1.shutdown() + } + + val pool2 = Executors.newFixedThreadPool(2, + new ThreadFactoryBuilder().setNameFormat("pool2-%d").build()) + try { + var ret = pool2.submit(new Callable[String] { + override def call(): String = { + assert(!Thread.currentThread().isDaemon) + assert(Thread.currentThread().getName == "pool2-0") + "" + } + }) + ret.get() + ret = pool2.submit(() => { + assert(!Thread.currentThread().isDaemon) + assert(Thread.currentThread().getName == "pool2-1") + "" + }) + ret.get() + } finally { + pool2.shutdown() + } + + val pool3 = Executors.newFixedThreadPool(2, + new ThreadFactoryBuilder().setNameFormat("pool3-%d").setDaemon(false).build()) + try { + pool3.submit(new Callable[String] { + override def call(): String = { + assert(!Thread.currentThread().isDaemon) + assert(Thread.currentThread().getName == "pool3-0") + "" + } + }).get() + } finally { + pool3.shutdown() + } + } +} diff --git a/jenkins/databricks/build.sh b/jenkins/databricks/build.sh index 261dd6bec0a..ef8521aea95 100755 --- a/jenkins/databricks/build.sh +++ b/jenkins/databricks/build.sh @@ -1,6 +1,6 @@ #!/bin/bash # -# Copyright (c) 2020-2021, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -87,11 +87,19 @@ then PARQUETHADOOPJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.parquet--parquet-hadoop--org.apache.parquet__parquet-hadoop__1.10.1-databricks9.jar PARQUETCOMMONJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.parquet--parquet-common--org.apache.parquet__parquet-common__1.10.1-databricks9.jar PARQUETCOLUMNJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.parquet--parquet-column--org.apache.parquet__parquet-column__1.10.1-databricks9.jar + ORC_CORE_JAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.orc--orc-core--org.apache.orc__orc-core__1.5.12.jar + ORC_SHIM_JAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.orc--orc-shims--org.apache.orc__orc-shims__1.5.12.jar + ORC_MAPREDUCE_JAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.orc--orc-mapreduce--org.apache.orc__orc-mapreduce__1.5.12.jar else PARQUETHADOOPJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.parquet--parquet-hadoop--org.apache.parquet__parquet-hadoop__1.10.1-databricks6.jar PARQUETCOMMONJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.parquet--parquet-common--org.apache.parquet__parquet-common__1.10.1-databricks6.jar PARQUETCOLUMNJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.parquet--parquet-column--org.apache.parquet__parquet-column__1.10.1-databricks6.jar + ORC_CORE_JAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.orc--orc-core--org.apache.orc__orc-core__1.5.10.jar + ORC_SHIM_JAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.orc--orc-shims--org.apache.orc__orc-shims__1.5.10.jar + ORC_MAPREDUCE_JAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.orc--orc-mapreduce--org.apache.orc__orc-mapreduce__1.5.10.jar fi + +PROTOBUF_JAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--com.google.protobuf--protobuf-java--com.google.protobuf__protobuf-java__2.6.1.jar PARQUETFORMATJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.parquet--parquet-format--org.apache.parquet__parquet-format__2.4.0.jar NETWORKCOMMON=----workspace_${SPARK_MAJOR_VERSION_STRING}--common--network-common--network-common-hive-2.3__hadoop-2.7_2.12_deploy.jar @@ -363,6 +371,38 @@ mvn -B install:install-file \ -Dversion=$SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS \ -Dpackaging=jar +mvn -B install:install-file \ + -Dmaven.repo.local=$M2DIR \ + -Dfile=$JARDIR/$ORC_CORE_JAR \ + -DgroupId=org.apache.orc \ + -DartifactId=orc-core \ + -Dversion=$SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS \ + -Dpackaging=jar + +mvn -B install:install-file \ + -Dmaven.repo.local=$M2DIR \ + -Dfile=$JARDIR/$ORC_SHIM_JAR \ + -DgroupId=org.apache.orc \ + -DartifactId=orc-shims \ + -Dversion=$SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS \ + -Dpackaging=jar + +mvn -B install:install-file \ + -Dmaven.repo.local=$M2DIR \ + -Dfile=$JARDIR/$ORC_MAPREDUCE_JAR \ + -DgroupId=org.apache.orc \ + -DartifactId=orc-mapreduce \ + -Dversion=$SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS \ + -Dpackaging=jar + +mvn -B install:install-file \ + -Dmaven.repo.local=$M2DIR \ + -Dfile=$JARDIR/$PROTOBUF_JAR \ + -DgroupId=com.google.protobuf \ + -DartifactId=protobuf-java \ + -Dversion=$SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS \ + -Dpackaging=jar + mvn -B -Ddatabricks -Dbuildver=$BUILDVER clean package -DskipTests cd /home/ubuntu diff --git a/pom.xml b/pom.xml index bc79b7eb526..23dbfa761bb 100644 --- a/pom.xml +++ b/pom.xml @@ -118,6 +118,7 @@ ${project.basedir}/src/main/301until310-all/scala ${project.basedir}/src/main/301until310-nondb/scala ${project.basedir}/src/main/301until320-all/scala + ${project.basedir}/src/main/301until320-noncdh/scala ${project.basedir}/src/main/301until320-nondb/scala ${project.basedir}/src/main/301until330-all/scala ${project.basedir}/src/main/pre320-treenode/scala @@ -129,6 +130,7 @@ + common dist integration_tests shuffle-plugin @@ -166,6 +168,7 @@ ${project.basedir}/src/main/301until310-all/scala ${project.basedir}/src/main/301until310-nondb/scala ${project.basedir}/src/main/301until320-all/scala + ${project.basedir}/src/main/301until320-noncdh/scala ${project.basedir}/src/main/301until320-nondb/scala ${project.basedir}/src/main/301until330-all/scala ${project.basedir}/src/main/pre320-treenode/scala @@ -182,6 +185,7 @@ spark302 + common dist integration_tests shuffle-plugin @@ -223,6 +227,7 @@ ${project.basedir}/src/main/301until310-all/scala ${project.basedir}/src/main/301until310-nondb/scala ${project.basedir}/src/main/301until320-all/scala + ${project.basedir}/src/main/301until320-noncdh/scala ${project.basedir}/src/main/301until320-nondb/scala ${project.basedir}/src/main/301until330-all/scala ${project.basedir}/src/main/pre320-treenode/scala @@ -234,6 +239,7 @@ + common dist integration_tests shuffle-plugin @@ -275,6 +281,7 @@ ${project.basedir}/src/main/301until310-all/scala ${project.basedir}/src/main/301until310-nondb/scala ${project.basedir}/src/main/301until320-all/scala + ${project.basedir}/src/main/301until320-noncdh/scala ${project.basedir}/src/main/301until320-nondb/scala ${project.basedir}/src/main/301until330-all/scala ${project.basedir}/src/main/pre320-treenode/scala @@ -286,6 +293,7 @@ + common dist integration_tests shuffle-plugin @@ -325,6 +333,7 @@ ${project.basedir}/src/main/301+-nondb/scala ${project.basedir}/src/main/311/scala ${project.basedir}/src/main/301until320-all/scala + ${project.basedir}/src/main/301until320-noncdh/scala ${project.basedir}/src/main/301until320-nondb/scala ${project.basedir}/src/main/301until330-all/scala ${project.basedir}/src/main/311+-all/scala @@ -342,6 +351,7 @@ + common dist integration_tests shuffle-plugin @@ -378,8 +388,6 @@ ${spark301db.version} ${spark301db.version} true - ${spark301db.version} - ${spark301db.version} @@ -396,6 +404,7 @@ ${project.basedir}/src/main/301db/scala ${project.basedir}/src/main/301until310-all/scala ${project.basedir}/src/main/301until320-all/scala + ${project.basedir}/src/main/301until320-noncdh/scala ${project.basedir}/src/main/301until330-all/scala ${project.basedir}/src/main/pre320-treenode/scala @@ -406,6 +415,7 @@ + common dist integration_tests shuffle-plugin @@ -439,8 +449,6 @@ ${spark312db.version} ${spark312db.version} true - ${spark312db.version} - ${spark312db.version} @@ -455,6 +463,7 @@ ${project.basedir}/src/main/301until320-all/scala + ${project.basedir}/src/main/301until320-noncdh/scala ${project.basedir}/src/main/312db/scala ${project.basedir}/src/main/301until330-all/scala ${project.basedir}/src/main/311until320-all/scala @@ -471,6 +480,7 @@ + common dist integration_tests shuffle-plugin @@ -508,6 +518,7 @@ ${project.basedir}/src/main/301+-nondb/scala ${project.basedir}/src/main/312/scala ${project.basedir}/src/main/301until320-all/scala + ${project.basedir}/src/main/301until320-noncdh/scala ${project.basedir}/src/main/301until320-nondb/scala ${project.basedir}/src/main/301until330-all/scala ${project.basedir}/src/main/311+-all/scala @@ -525,6 +536,7 @@ + common dist integration_tests shuffle-plugin @@ -565,6 +577,7 @@ ${project.basedir}/src/main/301+-nondb/scala ${project.basedir}/src/main/313/scala ${project.basedir}/src/main/301until320-all/scala + ${project.basedir}/src/main/301until320-noncdh/scala ${project.basedir}/src/main/301until320-nondb/scala ${project.basedir}/src/main/301until330-all/scala ${project.basedir}/src/main/311+-all/scala @@ -582,6 +595,7 @@ + common dist integration_tests shuffle-plugin @@ -646,6 +660,7 @@ + common dist integration_tests shuffle-plugin @@ -709,6 +724,7 @@ + common dist integration_tests shuffle-plugin @@ -772,6 +788,7 @@ + common dist integration_tests shuffle-plugin @@ -834,6 +851,7 @@ + common dist integration_tests shuffle-plugin @@ -895,6 +913,7 @@ + common dist integration_tests shuffle-plugin @@ -963,8 +982,6 @@ 22.04.0-SNAPSHOT 2.12 2.12.15 - 1.5.10 - org.rogach diff --git a/shuffle-plugin/pom.xml b/shuffle-plugin/pom.xml index ed1654161ba..410106f07ac 100644 --- a/shuffle-plugin/pom.xml +++ b/shuffle-plugin/pom.xml @@ -47,6 +47,11 @@ 1.11 compile + + com.nvidia + rapids-4-spark-common_${scala.binary.version} + ${project.version} + com.nvidia rapids-4-spark-sql_${scala.binary.version} diff --git a/shuffle-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/ucx/UCX.scala b/shuffle-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/ucx/UCX.scala index 177eeef1a5c..b9557fb5f63 100644 --- a/shuffle-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/ucx/UCX.scala +++ b/shuffle-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/ucx/UCX.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,9 +25,9 @@ import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable.ArrayBuffer import ai.rapids.cudf.{BaseDeviceMemoryBuffer, MemoryBuffer, NvtxColor, NvtxRange} -import com.google.common.util.concurrent.ThreadFactoryBuilder import com.nvidia.spark.rapids.{Arm, GpuDeviceManager, RapidsConf} import com.nvidia.spark.rapids.RapidsPluginImplicits._ +import com.nvidia.spark.rapids.ThreadFactoryBuilder import com.nvidia.spark.rapids.shuffle.{ClientConnection, MemoryRegistrationCallback, MessageType, MetadataTransportBuffer, TransportBuffer, TransportUtils} import org.openucx.jucx._ import org.openucx.jucx.ucp._ diff --git a/shuffle-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/ucx/UCXShuffleTransport.scala b/shuffle-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/ucx/UCXShuffleTransport.scala index 942a519b5cf..d7b22affda7 100644 --- a/shuffle-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/ucx/UCXShuffleTransport.scala +++ b/shuffle-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/ucx/UCXShuffleTransport.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,8 +23,8 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import ai.rapids.cudf.{BaseDeviceMemoryBuffer, CudaMemoryBuffer, DeviceMemoryBuffer, HostMemoryBuffer, MemoryBuffer} -import com.google.common.util.concurrent.ThreadFactoryBuilder import com.nvidia.spark.rapids.{GpuDeviceManager, HashedPriorityQueue, RapidsConf} +import com.nvidia.spark.rapids.ThreadFactoryBuilder import com.nvidia.spark.rapids.shuffle._ import com.nvidia.spark.rapids.shuffle.{BounceBufferManager, BufferReceiveState, ClientConnection, PendingTransferRequest, RapidsShuffleClient, RapidsShuffleRequestHandler, RapidsShuffleServer, RapidsShuffleTransport, RefCountedDirectByteBuffer} diff --git a/sql-plugin/pom.xml b/sql-plugin/pom.xml index 33417cc2c81..35766938d06 100644 --- a/sql-plugin/pom.xml +++ b/sql-plugin/pom.xml @@ -36,9 +36,9 @@ ${cuda.version} - com.google.flatbuffers - flatbuffers-java - compile + com.nvidia + rapids-4-spark-common_${scala.binary.version} + ${project.version} org.scala-lang @@ -49,41 +49,12 @@ scalatest_${scala.binary.version} test + + + - org.apache.orc - orc-core - ${orc.classifier} - - - org.slf4j - slf4j-api - - - - - org.apache.orc - orc-mapreduce - ${orc.classifier} - - - com.google.code.findbugs - jsr305 - - - - - org.apache.hive - hive-storage-api - - - org.slf4j - slf4j-api - - - - - com.google.protobuf - protobuf-java + com.google.flatbuffers + flatbuffers-java @@ -290,6 +261,36 @@ ${spark.version} provided + + org.apache.orc + orc-core + ${spark.version} + provided + + + org.apache.orc + orc-shims + ${spark.version} + provided + + + org.apache.orc + orc-mapreduce + ${spark.version} + provided + + + org.apache.hive + hive-storage-api + ${spark.version} + provided + + + com.google.protobuf + protobuf-java + ${spark.version} + provided + diff --git a/sql-plugin/src/main/301until320-all/scala/com/nvidia/spark/rapids/shims/v2/OrcShims301until320Base.scala b/sql-plugin/src/main/301until320-all/scala/com/nvidia/spark/rapids/shims/v2/OrcShims301until320Base.scala new file mode 100644 index 00000000000..543cb7c9afd --- /dev/null +++ b/sql-plugin/src/main/301until320-all/scala/com/nvidia/spark/rapids/shims/v2/OrcShims301until320Base.scala @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.nvidia.spark.rapids.shims.v2 + +import scala.collection.mutable.ArrayBuffer + +import com.nvidia.spark.rapids.OrcOutputStripe +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hive.common.io.DiskRangeList +import org.apache.orc.{CompressionCodec, CompressionKind, DataReader, OrcFile, OrcProto, PhysicalWriter, Reader, StripeInformation} +import org.apache.orc.impl.{DataReaderProperties, OutStream, SchemaEvolution} +import org.apache.orc.impl.RecordReaderImpl.SargApplier + +trait OrcShims301until320Base { + + // read data to buffer + def readFileData(dataReader: DataReader, inputDataRanges: DiskRangeList): DiskRangeList = { + dataReader.readFileData(inputDataRanges, 0, false) + } + + // create reader properties builder + def newDataReaderPropertiesBuilder(compressionSize: Int, + compressionKind: CompressionKind, typeCount: Int): DataReaderProperties.Builder = { + DataReaderProperties.builder() + .withBufferSize(compressionSize) + .withCompression(compressionKind) + .withTypeCount(typeCount) + } + + // create ORC out stream + def newOrcOutStream(name: String, bufferSize: Int, codec: CompressionCodec, + receiver: PhysicalWriter.OutputReceiver): OutStream = { + new OutStream(name, bufferSize, codec, receiver) + } + + // filter stripes by pushing down filter + def filterStripes( + stripes: Seq[StripeInformation], + conf: Configuration, + orcReader: Reader, + dataReader: DataReader, + gen: (StripeInformation, OrcProto.StripeFooter, Array[Int], Array[Int]) => OrcOutputStripe, + evolution: SchemaEvolution, + sargApp: SargApplier, + sargColumns: Array[Boolean], + ignoreNonUtf8BloomFilter: Boolean, + writerVersion: OrcFile.WriterVersion, + fileIncluded: Array[Boolean], + columnMapping: Array[Int], + idMapping: Array[Int]): ArrayBuffer[OrcOutputStripe] = { + val result = new ArrayBuffer[OrcOutputStripe](stripes.length) + stripes.foreach { stripe => + val stripeFooter = dataReader.readStripeFooter(stripe) + val needStripe = if (sargApp != null) { + // An ORC schema is a single struct type describing the schema fields + val orcFileSchema = evolution.getFileType(0) + val orcIndex = dataReader.readRowIndex(stripe, orcFileSchema, stripeFooter, + ignoreNonUtf8BloomFilter, fileIncluded, null, sargColumns, + writerVersion, null, null) + val rowGroups = sargApp.pickRowGroups(stripe, orcIndex.getRowGroupIndex, + orcIndex.getBloomFilterKinds, stripeFooter.getColumnsList, orcIndex.getBloomFilterIndex, + true) + rowGroups != SargApplier.READ_NO_RGS + } else { + true + } + + if (needStripe) { + result.append(gen(stripe, stripeFooter, columnMapping, idMapping)) + } + } + result + } +} diff --git a/sql-plugin/src/main/301until320-noncdh/scala/com/nvidia/spark/rapids/shims/v2/OrcShims.scala b/sql-plugin/src/main/301until320-noncdh/scala/com/nvidia/spark/rapids/shims/v2/OrcShims.scala new file mode 100644 index 00000000000..dcac01eefe9 --- /dev/null +++ b/sql-plugin/src/main/301until320-noncdh/scala/com/nvidia/spark/rapids/shims/v2/OrcShims.scala @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.nvidia.spark.rapids.shims.v2 + +import com.nvidia.spark.rapids.RapidsPluginImplicits._ +import org.apache.orc.Reader + +object OrcShims extends OrcShims301until320Base { + + // the ORC Reader in non CDH Spark is closeable + def withReader[T <: AutoCloseable, V](r: T)(block: T => V): V = { + try { + block(r) + } finally { + r.safeClose() + } + } + + // the ORC Reader in non CDH Spark is closeable + def closeReader(reader: Reader): Unit = { + if (reader != null) { + reader.close() + } + } +} diff --git a/sql-plugin/src/main/311cdh/scala/com/nvidia/spark/rapids/shims/v2/OrcShims.scala b/sql-plugin/src/main/311cdh/scala/com/nvidia/spark/rapids/shims/v2/OrcShims.scala new file mode 100644 index 00000000000..ddc4534cb39 --- /dev/null +++ b/sql-plugin/src/main/311cdh/scala/com/nvidia/spark/rapids/shims/v2/OrcShims.scala @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.nvidia.spark.rapids.shims.v2 + +import org.apache.orc.Reader + +object OrcShims extends OrcShims301until320Base { + + // ORC Reader of the 311cdh Spark has no close method. + // The resource is closed internally. + def withReader[V](r: Reader)(block: Reader => V): V = { + block(r) + } + + // empty + def closeReader(reader: Reader): Unit = { + } + +} diff --git a/sql-plugin/src/main/320+/scala/com/nvidia/spark/rapids/shims/v2/OrcShims.scala b/sql-plugin/src/main/320+/scala/com/nvidia/spark/rapids/shims/v2/OrcShims.scala new file mode 100644 index 00000000000..7a0fed4abc9 --- /dev/null +++ b/sql-plugin/src/main/320+/scala/com/nvidia/spark/rapids/shims/v2/OrcShims.scala @@ -0,0 +1,120 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.nvidia.spark.rapids.shims.v2 + +import scala.collection.mutable.ArrayBuffer + +import com.nvidia.spark.rapids.OrcOutputStripe +import com.nvidia.spark.rapids.RapidsPluginImplicits._ +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hive.common.io.DiskRangeList +import org.apache.orc.{CompressionCodec, CompressionKind, DataReader, OrcConf, OrcFile, OrcProto, PhysicalWriter, Reader, StripeInformation} +import org.apache.orc.impl.{BufferChunk, BufferChunkList, DataReaderProperties, InStream, OrcCodecPool, OutStream, ReaderImpl, SchemaEvolution} +import org.apache.orc.impl.RecordReaderImpl.SargApplier +import org.apache.orc.impl.reader.StripePlanner +import org.apache.orc.impl.writer.StreamOptions + +// 320+ ORC shims +object OrcShims { + + // the ORC Reader in non-CDH Spark is closeable + def withReader[T <: Reader, V](r: T)(block: T => V): V = { + try { + block(r) + } finally { + r.safeClose() + } + } + + // the ORC Reader in non-CDH Spark is closeable + def closeReader(reader: Reader): Unit = { + if(reader != null) { + reader.close() + } + } + + // read data to buffer + def readFileData(dataReader: DataReader, inputDataRanges: DiskRangeList): DiskRangeList = { + + // convert DiskRangeList to BufferChunkList + val chuckList = new BufferChunkList + var curr = inputDataRanges + while (curr != null) { + chuckList.add(new BufferChunk(curr.getOffset, curr.getLength)) + curr = curr.next + } + + // BufferChunk is subclass of DiskRangeList + dataReader.readFileData(chuckList, false).get() + } + + // create reader properties builder + def newDataReaderPropertiesBuilder(compressionSize: Int, + compressionKind: CompressionKind, typeCount: Int): DataReaderProperties.Builder = { + val compression = new InStream.StreamOptions() + .withBufferSize(compressionSize).withCodec(OrcCodecPool.getCodec(compressionKind)) + DataReaderProperties.builder().withCompression(compression) + } + + // create ORC out stream + def newOrcOutStream(name: String, bufferSize: Int, codec: CompressionCodec, + receiver: PhysicalWriter.OutputReceiver): OutStream = { + val options = new StreamOptions(bufferSize).withCodec(codec, codec.getDefaultOptions) + new OutStream(name, options, receiver) + } + + // filter stripes by pushing down filter + def filterStripes( + stripes: Seq[StripeInformation], + conf: Configuration, + orcReader: Reader, + dataReader: DataReader, + gen: (StripeInformation, OrcProto.StripeFooter, Array[Int], Array[Int]) => OrcOutputStripe, + evolution: SchemaEvolution, + sargApp: SargApplier, + sargColumns: Array[Boolean], + ignoreNonUtf8BloomFilter: Boolean, + writerVersion: OrcFile.WriterVersion, + fileIncluded: Array[Boolean], + columnMapping: Array[Int], + idMapping: Array[Int]): ArrayBuffer[OrcOutputStripe] = { + + val orcReaderImpl = orcReader.asInstanceOf[ReaderImpl] + val maxDiskRangeChunkLimit = OrcConf.ORC_MAX_DISK_RANGE_CHUNK_LIMIT.getInt(conf) + val planner = new StripePlanner(evolution.getFileSchema, orcReaderImpl.getEncryption(), + dataReader, writerVersion, ignoreNonUtf8BloomFilter, maxDiskRangeChunkLimit) + + val result = new ArrayBuffer[OrcOutputStripe](stripes.length) + stripes.foreach { stripe => + val stripeFooter = dataReader.readStripeFooter(stripe) + val needStripe = if (sargApp != null) { + val orcIndex = planner.parseStripe(stripe, fileIncluded).readRowIndex(sargColumns, null) + val rowGroups = sargApp.pickRowGroups(stripe, orcIndex.getRowGroupIndex, + orcIndex.getBloomFilterKinds, stripeFooter.getColumnsList, orcIndex.getBloomFilterIndex, + true) + rowGroups != SargApplier.READ_NO_RGS + } else { + true + } + + if (needStripe) { + result.append(gen(stripe, stripeFooter, columnMapping, idMapping)) + } + } + result + + } +} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala index ffba671dc2d..0bffd6cd3cb 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala @@ -26,7 +26,6 @@ import scala.collection.mutable.{ArrayBuffer, LinkedHashMap, Queue} import scala.math.max import ai.rapids.cudf.{ColumnVector, HostMemoryBuffer, NvtxColor, NvtxRange, Table} -import com.google.common.util.concurrent.ThreadFactoryBuilder import com.nvidia.spark.rapids.GpuMetric.{NUM_OUTPUT_BATCHES, PEAK_DEVICE_MEMORY, SEMAPHORE_WAIT_TIME} import org.apache.commons.io.IOUtils import org.apache.hadoop.conf.Configuration diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScanBase.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScanBase.scala index e6063853eb2..6493d2cd3a7 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScanBase.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScanBase.scala @@ -36,6 +36,7 @@ import com.google.protobuf.CodedOutputStream import com.nvidia.spark.rapids.GpuMetric._ import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.SchemaUtils._ +import com.nvidia.spark.rapids.shims.v2.OrcShims import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.common.io.DiskRangeList @@ -55,10 +56,10 @@ import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, Par import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.execution.datasources.PartitionedFile import org.apache.spark.sql.execution.datasources.orc.OrcUtils +import org.apache.spark.sql.execution.datasources.rapids.OrcFiltersWrapper import org.apache.spark.sql.execution.datasources.v2.{EmptyPartitionReader, FilePartitionReaderFactory} import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.rapids.OrcFilters import org.apache.spark.sql.rapids.execution.TrampolineUtil import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.{ArrayType, DataType, DecimalType, MapType, StructType} @@ -319,13 +320,13 @@ trait OrcCommonFunctions extends OrcCodecWritingHelper { withResource(OrcTools.buildDataReader(ctx)) { dataReader => val start = System.nanoTime() - val bufferChunks = dataReader.readFileData(inputDataRanges, 0, false) + val bufferChunks = OrcShims.readFileData(dataReader, inputDataRanges) val mid = System.nanoTime() var current = bufferChunks while (current != null) { out.write(current.getData) if (dataReader.isTrackingDiskRanges && current.isInstanceOf[BufferChunk]) { - dataReader.releaseBuffer(current.asInstanceOf[BufferChunk].getChunk) + dataReader.releaseBuffer(current.getData) } current = current.next } @@ -740,17 +741,18 @@ private object OrcTools extends Arm { } val maxDiskRangeChunkLimit = OrcConf.ORC_MAX_DISK_RANGE_CHUNK_LIMIT.getInt(conf) val file = filePath.getFileSystem(conf).open(filePath) + + val typeCount = org.apache.orc.OrcUtils.getOrcTypes(fileSchema).size //noinspection ScalaDeprecation - RecordReaderUtils.createDefaultDataReader(DataReaderProperties.builder() - .withBufferSize(compressionSize) - .withCompression(compressionKind) - .withFileSystem(fs) - .withPath(filePath) - .withFile(file) // explicitly specify the FSDataInputStream - .withTypeCount(org.apache.orc.OrcUtils.getOrcTypes(fileSchema).size) - .withZeroCopy(zeroCopy) - .withMaxDiskRangeChunkLimit(maxDiskRangeChunkLimit) - .build()) + val reader = RecordReaderUtils.createDefaultDataReader( + OrcShims.newDataReaderPropertiesBuilder(compressionSize, compressionKind, typeCount) + .withFileSystem(fs) + .withPath(filePath) + .withZeroCopy(zeroCopy) + .withMaxDiskRangeChunkLimit(maxDiskRangeChunkLimit) + .build()) + reader.open() // 311cdh needs to initialize the internal FSDataInputStream file variable. + reader } } @@ -783,8 +785,8 @@ private case class GpuOrcFileFilterHandler( val orcFileReaderOpts = OrcFile.readerOptions(conf).filesystem(fs) // After getting the necessary information from ORC reader, we must close the ORC reader - withResource(OrcFile.createReader(filePath, orcFileReaderOpts)) { orcReader => - val resultedColPruneInfo = requestedColumnIds(isCaseSensitive, dataSchema, + OrcShims.withReader(OrcFile.createReader(filePath, orcFileReaderOpts)) { orcReader => + val resultedColPruneInfo = requestedColumnIds(isCaseSensitive, dataSchema, readDataSchema, orcReader) if (resultedColPruneInfo.isEmpty) { // Be careful when the OrcPartitionReaderContext is null, we should change @@ -822,7 +824,7 @@ private case class GpuOrcFileFilterHandler( val readerOpts = OrcInputFormat.buildOptions( conf, orcReader, partFile.start, partFile.length) // create the search argument if we have pushed filters - OrcFilters.createFilter(fullSchema, pushedFilters).foreach { f => + OrcFiltersWrapper.createFilter(fullSchema, pushedFilters).foreach { f => readerOpts.searchArgument(f, fullSchema.fieldNames) } readerOpts @@ -882,7 +884,7 @@ private case class GpuOrcFileFilterHandler( if (matchedOrcFields.size > 1) { // Need to fail if there is ambiguity, i.e. more than one field is matched. val matchedOrcFieldsString = matchedOrcFields.mkString("[", ", ", "]") - reader.close() + OrcShims.closeReader(reader) throw new RuntimeException(s"""Found duplicate field(s) "$requiredFieldName": """ + s"$matchedOrcFieldsString in case-insensitive mode") } else { @@ -1088,29 +1090,10 @@ private case class GpuOrcFileFilterHandler( val fileIncluded = calcOrcFileIncluded(evolution) val (columnMapping, idMapping) = columnRemap(fileIncluded, evolution.getFileSchema, updatedReadSchema, isCaseSensitive) - val result = new ArrayBuffer[OrcOutputStripe](stripes.length) - stripes.foreach { stripe => - val stripeFooter = dataReader.readStripeFooter(stripe) - val needStripe = if (sargApp != null) { - // An ORC schema is a single struct type describing the schema fields - val orcFileSchema = evolution.getFileType(0) - val orcIndex = dataReader.readRowIndex(stripe, orcFileSchema, stripeFooter, - ignoreNonUtf8BloomFilter, fileIncluded, null, sargColumns, - writerVersion, null, null) - val rowGroups = sargApp.pickRowGroups(stripe, orcIndex.getRowGroupIndex, - orcIndex.getBloomFilterKinds, stripeFooter.getColumnsList, orcIndex.getBloomFilterIndex, - true) - rowGroups != SargApplier.READ_NO_RGS - } else { - true - } - - if (needStripe) { - result.append(buildOutputStripe(stripe, stripeFooter, columnMapping, idMapping)) - } - } - - result + OrcShims.filterStripes(stripes, conf, orcReader, dataReader, + buildOutputStripe, evolution, + sargApp, sargColumns, ignoreNonUtf8BloomFilter, + writerVersion, fileIncluded, columnMapping, idMapping) } /** @@ -1552,8 +1535,8 @@ trait OrcCodecWritingHelper extends Arm { // note that this buffer is just for writing meta-data OrcConf.BUFFER_SIZE.getDefaultValue.asInstanceOf[Int] } - withResource(new OutStream(getClass.getSimpleName, orcBufferSize, codec, - outReceiver)) { codecStream => + withResource(OrcShims.newOrcOutStream( + getClass.getSimpleName, orcBufferSize, codec, outReceiver)) { codecStream => val protoWriter = CodedOutputStream.newInstance(codecStream) block(outChannel, protoWriter, codecStream) } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsShuffleHeartbeatManager.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsShuffleHeartbeatManager.scala index ac3705f7004..09dfdfc1869 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsShuffleHeartbeatManager.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsShuffleHeartbeatManager.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, NVIDIA CORPORATION. + * Copyright (c) 2021-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,7 +20,6 @@ import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit} import scala.collection.mutable.ArrayBuffer -import com.google.common.util.concurrent.ThreadFactoryBuilder import org.apache.commons.lang3.mutable.MutableLong import org.apache.spark.SparkEnv diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/execution/datasources/rapids/OrcFiltersWrapper.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/execution/datasources/rapids/OrcFiltersWrapper.scala new file mode 100644 index 00000000000..65792c76c82 --- /dev/null +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/execution/datasources/rapids/OrcFiltersWrapper.scala @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.rapids + +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument + +import org.apache.spark.sql.execution.datasources.orc.OrcFilters +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types.StructType + +// Wrapper for Spark OrcFilters which is in private package +object OrcFiltersWrapper { + def createFilter(schema: StructType, filters: Seq[Filter]): Option[SearchArgument] = { + OrcFilters.createFilter(schema, filters) + } +} diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/OrcFilters.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/OrcFilters.scala deleted file mode 100644 index 2dd9973cafd..00000000000 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/OrcFilters.scala +++ /dev/null @@ -1,278 +0,0 @@ -/* - * Copyright (c) 2019-2020, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.rapids - -import java.time.{Instant, LocalDate} - -import org.apache.hadoop.hive.common.`type`.HiveDecimal -import org.apache.hadoop.hive.ql.io.sarg.{PredicateLeaf, SearchArgument} -import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder -import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory.newBuilder -import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable - -import org.apache.spark.SparkException -import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateToDays, toJavaDate, toJavaTimestamp} -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.sources.Filter -import org.apache.spark.sql.types._ - -// This is derived from Apache Spark's OrcFilters code to avoid calling the -// Spark version. Spark's version can potentially create a search argument -// applier object that is incompatible with the orc:nohive jar that has been -// shaded as part of this project. - -/** - * Helper object for building ORC `SearchArgument`s, which are used for ORC predicate push-down. - * - * Due to limitation of ORC `SearchArgument` builder, we had to implement separate checking and - * conversion passes through the Filter to make sure we only convert predicates that are known - * to be convertible. - * - * An ORC `SearchArgument` must be built in one pass using a single builder. For example, you can't - * build `a = 1` and `b = 2` first, and then combine them into `a = 1 AND b = 2`. This is quite - * different from the cases in Spark SQL or Parquet, where complex filters can be easily built using - * existing simpler ones. - * - * The annoying part is that, `SearchArgument` builder methods like `startAnd()`, `startOr()`, and - * `startNot()` mutate internal state of the builder instance. This forces us to translate all - * convertible filters with a single builder instance. However, if we try to translate a filter - * before checking whether it can be converted or not, we may end up with a builder whose internal - * state is inconsistent in the case of an inconvertible filter. - * - * For example, to convert an `And` filter with builder `b`, we call `b.startAnd()` first, and then - * try to convert its children. Say we convert `left` child successfully, but find that `right` - * child is inconvertible. Alas, `b.startAnd()` call can't be rolled back, and `b` is inconsistent - * now. - * - * The workaround employed here is to trim the Spark filters before trying to convert them. This - * way, we can only do the actual conversion on the part of the Filter that is known to be - * convertible. - * - * P.S.: Hive seems to use `SearchArgument` together with `ExprNodeGenericFuncDesc` only. Usage of - * builder methods mentioned above can only be found in test code, where all tested filters are - * known to be convertible. - */ -object OrcFilters extends OrcFiltersBase { - - /** - * Create ORC filter as a SearchArgument instance. - */ - def createFilter(schema: StructType, filters: Seq[Filter]): Option[SearchArgument] = { - val dataTypeMap = OrcFilters.getSearchableTypeMap(schema, SQLConf.get.caseSensitiveAnalysis) - // Combines all convertible filters using `And` to produce a single conjunction - val conjunctionOptional = buildTree(convertibleFilters(dataTypeMap, filters)) - conjunctionOptional.map { conjunction => - // Then tries to build a single ORC `SearchArgument` for the conjunction predicate. - // The input predicate is fully convertible. There should not be any empty result in the - // following recursive method call `buildSearchArgument`. - buildSearchArgument(dataTypeMap, conjunction, newBuilder).build() - } - } - - def convertibleFilters( - dataTypeMap: Map[String, OrcPrimitiveField], - filters: Seq[Filter]): Seq[Filter] = { - import org.apache.spark.sql.sources._ - - def convertibleFiltersHelper( - filter: Filter, - canPartialPushDown: Boolean): Option[Filter] = filter match { - // At here, it is not safe to just convert one side and remove the other side - // if we do not understand what the parent filters are. - // - // Here is an example used to explain the reason. - // Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to - // convert b in ('1'). If we only convert a = 2, we will end up with a filter - // NOT(a = 2), which will generate wrong results. - // - // Pushing one side of AND down is only safe to do at the top level or in the child - // AND before hitting NOT or OR conditions, and in this case, the unsupported predicate - // can be safely removed. - case And(left, right) => - val leftResultOptional = convertibleFiltersHelper(left, canPartialPushDown) - val rightResultOptional = convertibleFiltersHelper(right, canPartialPushDown) - (leftResultOptional, rightResultOptional) match { - case (Some(leftResult), Some(rightResult)) => Some(And(leftResult, rightResult)) - case (Some(leftResult), None) if canPartialPushDown => Some(leftResult) - case (None, Some(rightResult)) if canPartialPushDown => Some(rightResult) - case _ => None - } - - // The Or predicate is convertible when both of its children can be pushed down. - // That is to say, if one/both of the children can be partially pushed down, the Or - // predicate can be partially pushed down as well. - // - // Here is an example used to explain the reason. - // Let's say we have - // (a1 AND a2) OR (b1 AND b2), - // a1 and b1 is convertible, while a2 and b2 is not. - // The predicate can be converted as - // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2) - // As per the logical in And predicate, we can push down (a1 OR b1). - case Or(left, right) => - for { - lhs <- convertibleFiltersHelper(left, canPartialPushDown) - rhs <- convertibleFiltersHelper(right, canPartialPushDown) - } yield Or(lhs, rhs) - case Not(pred) => - val childResultOptional = convertibleFiltersHelper(pred, canPartialPushDown = false) - childResultOptional.map(Not) - case other => - for (_ <- buildLeafSearchArgument(dataTypeMap, other, newBuilder())) yield other - } - filters.flatMap { filter => - convertibleFiltersHelper(filter, true) - } - } - - /** - * Get PredicateLeafType which is corresponding to the given DataType. - */ - def getPredicateLeafType(dataType: DataType): PredicateLeaf.Type = dataType match { - case BooleanType => PredicateLeaf.Type.BOOLEAN - case ByteType | ShortType | IntegerType | LongType => PredicateLeaf.Type.LONG - case FloatType | DoubleType => PredicateLeaf.Type.FLOAT - case StringType => PredicateLeaf.Type.STRING - case DateType => PredicateLeaf.Type.DATE - case TimestampType => PredicateLeaf.Type.TIMESTAMP - case _: DecimalType => PredicateLeaf.Type.DECIMAL - case _ => throw new UnsupportedOperationException(s"DataType: ${dataType.catalogString}") - } - - /** - * Cast literal values for filters. - * - * We need to cast to long because ORC raises exceptions - * at 'checkLiteralType' of SearchArgumentImpl.java. - */ - private def castLiteralValue(value: Any, dataType: DataType): Any = dataType match { - case ByteType | ShortType | IntegerType | LongType => - value.asInstanceOf[Number].longValue - case FloatType | DoubleType => - value.asInstanceOf[Number].doubleValue() - case _: DecimalType => - new HiveDecimalWritable(HiveDecimal.create(value.asInstanceOf[java.math.BigDecimal])) - case _: DateType if value.isInstanceOf[LocalDate] => - toJavaDate(localDateToDays(value.asInstanceOf[LocalDate])) - case _: TimestampType if value.isInstanceOf[Instant] => - toJavaTimestamp(instantToMicros(value.asInstanceOf[Instant])) - case _ => value - } - - /** - * Build a SearchArgument and return the builder so far. - * - * @param dataTypeMap a map from the attribute name to its data type. - * @param expression the input predicates, which should be fully convertible to SearchArgument. - * @param builder the input SearchArgument.Builder. - * @return the builder so far. - */ - private def buildSearchArgument( - dataTypeMap: Map[String, OrcPrimitiveField], - expression: Filter, - builder: Builder): Builder = { - import org.apache.spark.sql.sources._ - - expression match { - case And(left, right) => - val lhs = buildSearchArgument(dataTypeMap, left, builder.startAnd()) - val rhs = buildSearchArgument(dataTypeMap, right, lhs) - rhs.end() - - case Or(left, right) => - val lhs = buildSearchArgument(dataTypeMap, left, builder.startOr()) - val rhs = buildSearchArgument(dataTypeMap, right, lhs) - rhs.end() - - case Not(child) => - buildSearchArgument(dataTypeMap, child, builder.startNot()).end() - - case other => - buildLeafSearchArgument(dataTypeMap, other, builder).getOrElse { - throw new SparkException( - "The input filter of OrcFilters.buildSearchArgument should be fully convertible.") - } - } - } - - /** - * Build a SearchArgument for a leaf predicate and return the builder so far. - * - * @param dataTypeMap a map from the attribute name to its data type. - * @param expression the input filter predicates. - * @param builder the input SearchArgument.Builder. - * @return the builder so far. - */ - private def buildLeafSearchArgument( - dataTypeMap: Map[String, OrcPrimitiveField], - expression: Filter, - builder: Builder): Option[Builder] = { - def getType(attribute: String): PredicateLeaf.Type = - getPredicateLeafType(dataTypeMap(attribute).fieldType) - - import org.apache.spark.sql.sources._ - - // NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()` - // call is mandatory. ORC `SearchArgument` builder requires that all leaf predicates must be - // wrapped by a "parent" predicate (`And`, `Or`, or `Not`). - expression match { - case EqualTo(name, value) if dataTypeMap.contains(name) => - val castedValue = castLiteralValue(value, dataTypeMap(name).fieldType) - Some(builder.startAnd() - .equals(dataTypeMap(name).fieldName, getType(name), castedValue).end()) - - case EqualNullSafe(name, value) if dataTypeMap.contains(name) => - val castedValue = castLiteralValue(value, dataTypeMap(name).fieldType) - Some(builder.startAnd() - .nullSafeEquals(dataTypeMap(name).fieldName, getType(name), castedValue).end()) - - case LessThan(name, value) if dataTypeMap.contains(name) => - val castedValue = castLiteralValue(value, dataTypeMap(name).fieldType) - Some(builder.startAnd() - .lessThan(dataTypeMap(name).fieldName, getType(name), castedValue).end()) - - case LessThanOrEqual(name, value) if dataTypeMap.contains(name) => - val castedValue = castLiteralValue(value, dataTypeMap(name).fieldType) - Some(builder.startAnd() - .lessThanEquals(dataTypeMap(name).fieldName, getType(name), castedValue).end()) - - case GreaterThan(name, value) if dataTypeMap.contains(name) => - val castedValue = castLiteralValue(value, dataTypeMap(name).fieldType) - Some(builder.startNot() - .lessThanEquals(dataTypeMap(name).fieldName, getType(name), castedValue).end()) - - case GreaterThanOrEqual(name, value) if dataTypeMap.contains(name) => - val castedValue = castLiteralValue(value, dataTypeMap(name).fieldType) - Some(builder.startNot() - .lessThan(dataTypeMap(name).fieldName, getType(name), castedValue).end()) - - case IsNull(name) if dataTypeMap.contains(name) => - Some(builder.startAnd() - .isNull(dataTypeMap(name).fieldName, getType(name)).end()) - - case IsNotNull(name) if dataTypeMap.contains(name) => - Some(builder.startNot() - .isNull(dataTypeMap(name).fieldName, getType(name)).end()) - - case In(name, values) if dataTypeMap.contains(name) => - val castedValues = values.map(v => castLiteralValue(v, dataTypeMap(name).fieldType)) - Some(builder.startAnd().in(dataTypeMap(name).fieldName, getType(name), - castedValues.map(_.asInstanceOf[AnyRef]): _*).end()) - - case _ => None - } - } -} diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/OrcFiltersBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/OrcFiltersBase.scala deleted file mode 100644 index d4fb2f260d6..00000000000 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/OrcFiltersBase.scala +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Copyright (c) 2021, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.rapids - -import java.util.Locale - -import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap -import org.apache.spark.sql.sources.{And, Filter} -import org.apache.spark.sql.types.{AtomicType, BinaryType, DataType, StructField, StructType} - -/** - * Methods that can be shared when upgrading the built-in Hive. - * - * Derived from Apache Spark to avoid depending upon it directly, - * since its API has changed between Spark versions. - */ -trait OrcFiltersBase { - - private[sql] def buildTree(filters: Seq[Filter]): Option[Filter] = { - filters match { - case Seq() => None - case Seq(filter) => Some(filter) - case Seq(filter1, filter2) => Some(And(filter1, filter2)) - case _ => // length > 2 - val (left, right) = filters.splitAt(filters.length / 2) - Some(And(buildTree(left).get, buildTree(right).get)) - } - } - - case class OrcPrimitiveField(fieldName: String, fieldType: DataType) - - /** - * This method returns a map which contains ORC field name and data type. Each key - * represents a column; `dots` are used as separators for nested columns. If any part - * of the names contains `dots`, it is quoted to avoid confusion. See - * `org.apache.spark.sql.connector.catalog.quoted` for implementation details. - * - * BinaryType, UserDefinedType, ArrayType and MapType are ignored. - */ - protected[sql] def getSearchableTypeMap( - schema: StructType, - caseSensitive: Boolean): Map[String, OrcPrimitiveField] = { - import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper - - def getPrimitiveFields( - fields: Seq[StructField], - parentFieldNames: Seq[String] = Seq.empty): Seq[(String, OrcPrimitiveField)] = { - fields.flatMap { f => - f.dataType match { - case st: StructType => - getPrimitiveFields(st.fields, parentFieldNames :+ f.name) - case BinaryType => None - case _: AtomicType => - val fieldName = (parentFieldNames :+ f.name).quoted - val orcField = OrcPrimitiveField(fieldName, f.dataType) - Some((fieldName, orcField)) - case _ => None - } - } - } - - val primitiveFields = getPrimitiveFields(schema.fields) - if (caseSensitive) { - primitiveFields.toMap - } else { - // Don't consider ambiguity here, i.e. more than one field are matched in case insensitive - // mode, just skip pushdown for these fields, they will trigger Exception when reading, - // See: SPARK-25175. - val dedupPrimitiveFields = primitiveFields - .groupBy(_._1.toLowerCase(Locale.ROOT)) - .filter(_._2.size == 1) - .mapValues(_.head._2) - CaseInsensitiveMap(dedupPrimitiveFields) - } - } -} diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala index a3ba5724c66..f3e5737c0ca 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,7 +26,6 @@ import scala.util.control.NonFatal import ai.rapids.cudf.{HostMemoryBuffer, JCudfSerialization, NvtxColor, NvtxRange} import ai.rapids.cudf.JCudfSerialization.SerializedTableHeader -import com.google.common.util.concurrent.ThreadFactoryBuilder import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.GpuMetric._ import com.nvidia.spark.rapids.RapidsPluginImplicits._ diff --git a/tests/pom.xml b/tests/pom.xml index 356d2b18156..3329f0dee7c 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -54,12 +54,6 @@ ${cuda.version} provided - - com.nvidia - rapids-4-spark_${scala.binary.version} - ${project.version} - provided - com.nvidia diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/OrcScanSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/OrcScanSuite.scala index 997409412fb..a94affbf08d 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/OrcScanSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/OrcScanSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -106,6 +106,9 @@ class OrcScanSuite extends SparkQueryCompareTestSuite { * is actually 1582-09-23 in proleptic Gregorian calendar. */ test("test hybrid Julian Gregorian calendar vs proleptic Gregorian calendar") { + // After Spark 3.1.1, Orc failed to prune when converting Hybrid calendar to Proleptic calendar + // Orc bug: https://issues.apache.org/jira/browse/ORC-1083 + assumePriorToSpark311 withCpuSparkSession(spark => { val df = frameFromOrcWithSchema("hybrid-Julian-calendar.orc", diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala index 1f84b04ad77..b8357c9db15 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -1835,6 +1835,9 @@ trait SparkQueryCompareTestSuite extends FunSuite with Arm { def assumeSpark320orLater: Assertion = assume(VersionUtils.isSpark320OrLater, "Spark version not 3.2.0+") + def assumePriorToSpark311: Assertion = + assume(!VersionUtils.isSpark311OrLater, "Spark version not before 3.1.1") + def cmpSparkVersion(major: Int, minor: Int, bugfix: Int): Int = { val sparkShimVersion = ShimLoader.getSparkShims.getSparkShimVersion val (sparkMajor, sparkMinor, sparkBugfix) = sparkShimVersion match { diff --git a/tools/pom.xml b/tools/pom.xml index d8fffb17ccb..25fe91d7f5f 100644 --- a/tools/pom.xml +++ b/tools/pom.xml @@ -40,6 +40,11 @@ + + com.nvidia + rapids-4-spark-common_${scala.binary.version} + ${project.version} + org.scala-lang scala-library @@ -100,18 +105,14 @@ org.rogach:scallop_${scala.binary.version} + com.nvidia:rapids-4-spark-common_${scala.binary.version} - - org.rogach:scallop_${scala.binary.version}:* - - META-INF/*.MF - - *:* + META-INF/*.MF META-INF/*.SF META-INF/*.DSA META-INF/*.RSA diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala index c8ef1f7cd9d..137e35a8f7b 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, NVIDIA CORPORATION. + * Copyright (c) 2021-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,7 +22,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.util.control.NonFatal -import com.google.common.util.concurrent.ThreadFactoryBuilder +import com.nvidia.spark.rapids.ThreadFactoryBuilder import com.nvidia.spark.rapids.tool.{EventLogInfo, EventLogPathProcessor} import org.apache.hadoop.conf.Configuration diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala index 33579a98b3d..a895d28fe69 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, NVIDIA CORPORATION. + * Copyright (c) 2021-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,7 +20,7 @@ import java.util.concurrent.{ConcurrentLinkedQueue, Executors, ThreadPoolExecuto import scala.collection.JavaConverters._ -import com.google.common.util.concurrent.ThreadFactoryBuilder +import com.nvidia.spark.rapids.ThreadFactoryBuilder import com.nvidia.spark.rapids.tool.EventLogInfo import org.apache.hadoop.conf.Configuration diff --git a/tools/src/main/scala/org/apache/spark/sql/rapids/tool/AppFilterImpl.scala b/tools/src/main/scala/org/apache/spark/sql/rapids/tool/AppFilterImpl.scala index 2b95e7639cd..5ebae2a075b 100644 --- a/tools/src/main/scala/org/apache/spark/sql/rapids/tool/AppFilterImpl.scala +++ b/tools/src/main/scala/org/apache/spark/sql/rapids/tool/AppFilterImpl.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, NVIDIA CORPORATION. + * Copyright (c) 2021-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,7 +22,7 @@ import java.util.regex.PatternSyntaxException import scala.collection.JavaConverters._ -import com.google.common.util.concurrent.ThreadFactoryBuilder +import com.nvidia.spark.rapids.ThreadFactoryBuilder import com.nvidia.spark.rapids.tool.EventLogInfo import com.nvidia.spark.rapids.tool.qualification.QualificationArgs import org.apache.hadoop.conf.Configuration diff --git a/udf-examples/pom.xml b/udf-examples/pom.xml index e5bc938f9f7..f44cce94e81 100644 --- a/udf-examples/pom.xml +++ b/udf-examples/pom.xml @@ -142,6 +142,12 @@ ${spark.version} provided + + org.apache.hive + hive-storage-api + ${spark.version} + provided +