Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] test_cache_expand_exec fails on Spark 3.3 #5429

Closed
jlowe opened this issue May 5, 2022 · 0 comments · Fixed by #5425
Closed

[BUG] test_cache_expand_exec fails on Spark 3.3 #5429

jlowe opened this issue May 5, 2022 · 0 comments · Fixed by #5425
Labels
bug Something isn't working P0 Must have for release

Comments

@jlowe
Copy link
Member

jlowe commented May 5, 2022

Last night's Spark 3.3 test run failed with errors in test_cache_expand_exec. For example:

07:37:48  _ test_cache_expand_exec[{'spark.sql.inMemoryColumnarStorage.enableVectorizedReader': 'false'}-Decimal(12,2)] _
07:37:48  
07:37:48  data_gen = Decimal(12,2)
07:37:48  enable_vectorized_conf = {'spark.sql.inMemoryColumnarStorage.enableVectorizedReader': 'false'}
07:37:48  
07:37:48      @pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
07:37:48      @pytest.mark.parametrize('enable_vectorized_conf', enable_vectorized_confs, ids=idfn)
07:37:48      @ignore_order
07:37:48      def test_cache_expand_exec(data_gen, enable_vectorized_conf):
07:37:48          def op_df(spark, length=2048, seed=0):
07:37:48              cached = gen_df(spark, StructGen([
07:37:48                  ('a', data_gen),
07:37:48                  ('b', IntegerGen())], nullable=False), length=length, seed=seed).cache()
07:37:48              cached.count() # populate the cache
07:37:48              return cached.rollup(f.col("a"), f.col("b")).agg(f.col("b"))
07:37:48      
07:37:48  >       assert_gpu_and_cpu_are_equal_collect(op_df, conf=enable_vectorized_conf)
07:37:48  
07:37:48  ../../src/main/python/cache_test.py:101: 
07:37:48  _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
07:37:48  ../../src/main/python/asserts.py:508: in assert_gpu_and_cpu_are_equal_collect
07:37:48      _assert_gpu_and_cpu_are_equal(func, 'COLLECT', conf=conf, is_cpu_first=is_cpu_first)
07:37:48  ../../src/main/python/asserts.py:427: in _assert_gpu_and_cpu_are_equal
07:37:48      run_on_cpu()
07:37:48  ../../src/main/python/asserts.py:413: in run_on_cpu
07:37:48      from_cpu = with_cpu_session(bring_back, conf=conf)
07:37:48  ../../src/main/python/spark_session.py:114: in with_cpu_session
07:37:48      return with_spark_session(func, conf=copy)
07:37:48  ../../src/main/python/spark_session.py:98: in with_spark_session
07:37:48      ret = func(_spark)
07:37:48  ../../src/main/python/asserts.py:201: in <lambda>
07:37:48      bring_back = lambda spark: limit_func(spark).collect()
07:37:48  ../../src/main/python/asserts.py:184: in with_sorted
07:37:48      df = func(spark)
07:37:48  ../../src/main/python/cache_test.py:98: in op_df
07:37:48      cached.count() # populate the cache
07:37:48  /home/jenkins/agent/workspace/jenkins-rapids_it-3.3.x-SNAPSHOT-dev-github-80/jars/spark-3.3.0-SNAPSHOT-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/sql/dataframe.py:804: in count
07:37:48      return int(self._jdf.count())
07:37:48  /home/jenkins/agent/workspace/jenkins-rapids_it-3.3.x-SNAPSHOT-dev-github-80/jars/spark-3.3.0-SNAPSHOT-bin-hadoop3.2/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py:1321: in __call__
07:37:48      return_value = get_return_value(
07:37:48  /home/jenkins/agent/workspace/jenkins-rapids_it-3.3.x-SNAPSHOT-dev-github-80/jars/spark-3.3.0-SNAPSHOT-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/sql/utils.py:190: in deco
07:37:48      return f(*a, **kw)
07:37:48  _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
07:37:48  
07:37:48  answer = 'xro2660'
07:37:48  gateway_client = <py4j.clientserver.JavaClient object at 0x7f2e05588220>
07:37:48  target_id = 'o2658', name = 'count'
07:37:48  
07:37:48      def get_return_value(answer, gateway_client, target_id=None, name=None):
07:37:48          """Converts an answer received from the Java gateway into a Python object.
07:37:48      
07:37:48          For example, string representation of integers are converted to Python
07:37:48          integer, string representation of objects are converted to JavaObject
07:37:48          instances, etc.
07:37:48      
07:37:48          :param answer: the string returned by the Java gateway
07:37:48          :param gateway_client: the gateway client used to communicate with the Java
07:37:48              Gateway. Only necessary if the answer is a reference (e.g., object,
07:37:48              list, map)
07:37:48          :param target_id: the name of the object from which the answer comes from
07:37:48              (e.g., *object1* in `object1.hello()`). Optional.
07:37:48          :param name: the name of the member from which the answer comes from
07:37:48              (e.g., *hello* in `object1.hello()`). Optional.
07:37:48          """
07:37:48          if is_error(answer)[0]:
07:37:48              if len(answer) > 1:
07:37:48                  type = answer[1]
07:37:48                  value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
07:37:48                  if answer[1] == REFERENCE_TYPE:
07:37:48  >                   raise Py4JJavaError(
07:37:48                          "An error occurred while calling {0}{1}{2}.\n".
07:37:48                          format(target_id, ".", name), value)
07:37:48  E                   py4j.protocol.Py4JJavaError: An error occurred while calling o2658.count.
07:37:48  E                   : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 48.0 failed 1 times, most recent failure: Lost task 0.0 in stage 48.0 (TID 48) (10.233.91.240 executor 0): java.lang.NoSuchMethodException: org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(int, org.apache.spark.sql.execution.vectorized.WritableColumnVector)
07:37:48  E                   	at java.lang.Class.getDeclaredMethod(Class.java:2130)
07:37:48  E                   	at com.nvidia.spark.rapids.ParquetCachedBatchSerializer$CachedBatchIteratorConsumer.<init>(ParquetCachedBatchSerializer.scala:693)
07:37:48  E                   	at com.nvidia.spark.rapids.ParquetCachedBatchSerializer.$anonfun$convertCachedBatchToInternalRow$1(ParquetCachedBatchSerializer.scala:595)
07:37:48  E                   	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
07:37:48  E                   	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
07:37:48  E                   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
07:37:48  E                   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
07:37:48  E                   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
07:37:48  E                   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
07:37:48  E                   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
07:37:48  E                   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
07:37:48  E                   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
07:37:48  E                   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
07:37:48  E                   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
07:37:48  E                   	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
07:37:48  E                   	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
07:37:48  E                   	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
07:37:48  E                   	at org.apache.spark.scheduler.Task.run(Task.scala:136)
07:37:48  E                   	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
07:37:48  E                   	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
07:37:48  E                   	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
07:37:48  E                   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
07:37:48  E                   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
07:37:48  E                   	at java.lang.Thread.run(Thread.java:748)
07:37:48  E                   
07:37:48  E                   Driver stacktrace:
07:37:48  E                   	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
07:37:48  E                   	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
07:37:48  E                   	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
07:37:48  E                   	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
07:37:48  E                   	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
07:37:48  E                   	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
07:37:48  E                   	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
07:37:48  E                   	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
07:37:48  E                   	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
07:37:48  E                   	at scala.Option.foreach(Option.scala:407)
07:37:48  E                   	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
07:37:48  E                   	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
07:37:48  E                   	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
07:37:48  E                   	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
07:37:48  E                   	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
07:37:48  E                   	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
07:37:48  E                   	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
07:37:48  E                   	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
07:37:48  E                   	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
07:37:48  E                   	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2293)
07:37:48  E                   	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
07:37:48  E                   	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
07:37:48  E                   	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
07:37:48  E                   	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
07:37:48  E                   	at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
07:37:48  E                   	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:424)
07:37:48  E                   	at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:3161)
07:37:48  E                   	at org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:3160)
07:37:48  E                   	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3856)
07:37:48  E                   	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
07:37:48  E                   	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
07:37:48  E                   	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
07:37:48  E                   	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
07:37:48  E                   	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
07:37:48  E                   	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3854)
07:37:48  E                   	at org.apache.spark.sql.Dataset.count(Dataset.scala:3160)
07:37:48  E                   	at sun.reflect.GeneratedMethodAccessor77.invoke(Unknown Source)
07:37:48  E                   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
07:37:48  E                   	at java.lang.reflect.Method.invoke(Method.java:498)
07:37:48  E                   	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
07:37:48  E                   	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
07:37:48  E                   	at py4j.Gateway.invoke(Gateway.java:282)
07:37:48  E                   	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
07:37:48  E                   	at py4j.commands.CallCommand.execute(CallCommand.java:79)
07:37:48  E                   	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
07:37:48  E                   	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
07:37:48  E                   	at java.lang.Thread.run(Thread.java:748)
07:37:48  E                   Caused by: java.lang.NoSuchMethodException: org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(int, org.apache.spark.sql.execution.vectorized.WritableColumnVector)
07:37:48  E                   	at java.lang.Class.getDeclaredMethod(Class.java:2130)
07:37:48  E                   	at com.nvidia.spark.rapids.ParquetCachedBatchSerializer$CachedBatchIteratorConsumer.<init>(ParquetCachedBatchSerializer.scala:693)
07:37:48  E                   	at com.nvidia.spark.rapids.ParquetCachedBatchSerializer.$anonfun$convertCachedBatchToInternalRow$1(ParquetCachedBatchSerializer.scala:595)
07:37:48  E                   	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
07:37:48  E                   	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
07:37:48  E                   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
07:37:48  E                   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
07:37:48  E                   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
07:37:48  E                   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
07:37:48  E                   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
07:37:48  E                   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
07:37:48  E                   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
07:37:48  E                   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
07:37:48  E                   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
07:37:48  E                   	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
07:37:48  E                   	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
07:37:48  E                   	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
07:37:48  E                   	at org.apache.spark.scheduler.Task.run(Task.scala:136)
07:37:48  E                   	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
07:37:48  E                   	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
07:37:48  E                   	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
07:37:48  E                   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
07:37:48  E                   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
07:37:48  E                   	... 1 more
@jlowe jlowe added bug Something isn't working ? - Needs Triage Need team to review and classify P0 Must have for release labels May 5, 2022
@sameerz sameerz removed the ? - Needs Triage Need team to review and classify label May 10, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working P0 Must have for release
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants