From b301aa924bb696affec9530a3c8981c54401e3ba Mon Sep 17 00:00:00 2001 From: marsishandsome Date: Fri, 16 Aug 2019 15:09:55 +0800 Subject: [PATCH 1/2] fix reflection bug --- .../tispark/utils/ReflectionUtil.scala | 134 +++++++++--------- 1 file changed, 68 insertions(+), 66 deletions(-) diff --git a/core/src/main/scala/com/pingcap/tispark/utils/ReflectionUtil.scala b/core/src/main/scala/com/pingcap/tispark/utils/ReflectionUtil.scala index bc504a7e7b..682ab2d318 100644 --- a/core/src/main/scala/com/pingcap/tispark/utils/ReflectionUtil.scala +++ b/core/src/main/scala/com/pingcap/tispark/utils/ReflectionUtil.scala @@ -15,8 +15,6 @@ package com.pingcap.tispark.utils -import java.lang.reflect.Method - import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.UnsafeRow @@ -43,85 +41,89 @@ object ReflectionUtil { // isOrderSensitive: Boolean = false): RDD[U] // // Hereby we use reflection to support different Spark versions. - private val mapPartitionsWithIndexInternal: Method = spark_version match { - case "2.3.0" | "2.3.1" => - tryLoadMethod( - "mapPartitionsWithIndexInternal", - mapPartitionsWithIndexInternalV1, - mapPartitionsWithIndexInternalV2 - ) - case _ => - // Spark version >= 2.3.2 - tryLoadMethod( - "mapPartitionsWithIndexInternal", - mapPartitionsWithIndexInternalV2, - mapPartitionsWithIndexInternalV1 - ) - } - - // Spark HDP Release may not compatible with official Release - // see https://github.com/pingcap/tispark/issues/1006 - private def tryLoadMethod(name: String, f1: () => Method, f2: () => Method): Method = { - try { - f1.apply() - } catch { - case _: Throwable => - try { - f2.apply() - } catch { - case _: Throwable => - throw ScalaReflectionException( - s"Cannot find reflection of Method $name, current Spark version is %s" - .format(spark_version) - ) - } - } + case class ReflectionMapPartitionWithIndexInternal( + rdd: RDD[InternalRow], + internalRowToUnsafeRowWithIndex: (Int, Iterator[InternalRow]) => Iterator[UnsafeRow] + ) { + // Spark HDP Release may not compatible with official Release + // see https://github.com/pingcap/tispark/issues/1006 + def invoke(): RDD[InternalRow] = + spark_version match { + case "2.3.0" | "2.3.1" => + try { + mapPartitionsWithIndexInternalV1(rdd, internalRowToUnsafeRowWithIndex) + } catch { + case _: Throwable => + try { + mapPartitionsWithIndexInternalV2(rdd, internalRowToUnsafeRowWithIndex) + } catch { + case _: Throwable => + throw ScalaReflectionException( + s"Cannot find reflection of Method mapPartitionsWithIndexInternal, current Spark version is %s" + .format(spark_version) + ) + } + } + case _ => + try { + mapPartitionsWithIndexInternalV2(rdd, internalRowToUnsafeRowWithIndex) + } catch { + case _: Throwable => + try { + mapPartitionsWithIndexInternalV1(rdd, internalRowToUnsafeRowWithIndex) + } catch { + case _: Throwable => + throw ScalaReflectionException( + s"Cannot find reflection of Method mapPartitionsWithIndexInternal, current Spark version is %s" + .format(spark_version) + ) + } + } + } } // Spark-2.3.0 & Spark-2.3.1 - private def mapPartitionsWithIndexInternalV1(): Method = - classOf[RDD[InternalRow]].getDeclaredMethod( + private def mapPartitionsWithIndexInternalV1( + rdd: RDD[InternalRow], + internalRowToUnsafeRowWithIndex: (Int, Iterator[InternalRow]) => Iterator[UnsafeRow] + ): RDD[InternalRow] = { + val method = classOf[RDD[InternalRow]].getDeclaredMethod( "mapPartitionsWithIndexInternal", classOf[(Int, Iterator[InternalRow]) => Iterator[UnsafeRow]], classOf[Boolean], classOf[ClassTag[UnsafeRow]] ) + method + .invoke( + rdd, + internalRowToUnsafeRowWithIndex, + Boolean.box(false), + ClassTag.apply(classOf[UnsafeRow]) + ) + .asInstanceOf[RDD[InternalRow]] + + } // >= Spark-2.3.2 - private def mapPartitionsWithIndexInternalV2(): Method = - classOf[RDD[InternalRow]].getDeclaredMethod( + private def mapPartitionsWithIndexInternalV2( + rdd: RDD[InternalRow], + internalRowToUnsafeRowWithIndex: (Int, Iterator[InternalRow]) => Iterator[UnsafeRow] + ): RDD[InternalRow] = { + val method = classOf[RDD[InternalRow]].getDeclaredMethod( "mapPartitionsWithIndexInternal", classOf[(Int, Iterator[InternalRow]) => Iterator[UnsafeRow]], classOf[Boolean], classOf[Boolean], classOf[ClassTag[UnsafeRow]] ) - - case class ReflectionMapPartitionWithIndexInternal( - rdd: RDD[InternalRow], - internalRowToUnsafeRowWithIndex: (Int, Iterator[InternalRow]) => Iterator[UnsafeRow] - ) { - def invoke(): RDD[InternalRow] = - spark_version match { - case "2.3.0" | "2.3.1" => - mapPartitionsWithIndexInternal - .invoke( - rdd, - internalRowToUnsafeRowWithIndex, - Boolean.box(false), - ClassTag.apply(classOf[UnsafeRow]) - ) - .asInstanceOf[RDD[InternalRow]] - case _ => - mapPartitionsWithIndexInternal - .invoke( - rdd, - internalRowToUnsafeRowWithIndex, - Boolean.box(false), - Boolean.box(false), - ClassTag.apply(classOf[UnsafeRow]) - ) - .asInstanceOf[RDD[InternalRow]] - } + method + .invoke( + rdd, + internalRowToUnsafeRowWithIndex, + Boolean.box(false), + Boolean.box(false), + ClassTag.apply(classOf[UnsafeRow]) + ) + .asInstanceOf[RDD[InternalRow]] } } From babf974d21401f8b7a388cac9e6d3e43b7ffb9ad Mon Sep 17 00:00:00 2001 From: marsishandsome Date: Mon, 19 Aug 2019 17:35:12 +0800 Subject: [PATCH 2/2] address code review --- .../tispark/utils/ReflectionUtil.scala | 105 +++++++++++------- 1 file changed, 66 insertions(+), 39 deletions(-) diff --git a/core/src/main/scala/com/pingcap/tispark/utils/ReflectionUtil.scala b/core/src/main/scala/com/pingcap/tispark/utils/ReflectionUtil.scala index 682ab2d318..d60f01538a 100644 --- a/core/src/main/scala/com/pingcap/tispark/utils/ReflectionUtil.scala +++ b/core/src/main/scala/com/pingcap/tispark/utils/ReflectionUtil.scala @@ -15,6 +15,8 @@ package com.pingcap.tispark.utils +import java.lang.reflect.Method + import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.UnsafeRow @@ -47,15 +49,15 @@ object ReflectionUtil { ) { // Spark HDP Release may not compatible with official Release // see https://github.com/pingcap/tispark/issues/1006 - def invoke(): RDD[InternalRow] = - spark_version match { + def invoke(): RDD[InternalRow] = { + val (version, method) = spark_version match { case "2.3.0" | "2.3.1" => try { - mapPartitionsWithIndexInternalV1(rdd, internalRowToUnsafeRowWithIndex) + reflectMapPartitionsWithIndexInternalV1(rdd, internalRowToUnsafeRowWithIndex) } catch { case _: Throwable => try { - mapPartitionsWithIndexInternalV2(rdd, internalRowToUnsafeRowWithIndex) + reflectMapPartitionsWithIndexInternalV2(rdd, internalRowToUnsafeRowWithIndex) } catch { case _: Throwable => throw ScalaReflectionException( @@ -64,13 +66,14 @@ object ReflectionUtil { ) } } + case _ => try { - mapPartitionsWithIndexInternalV2(rdd, internalRowToUnsafeRowWithIndex) + reflectMapPartitionsWithIndexInternalV2(rdd, internalRowToUnsafeRowWithIndex) } catch { case _: Throwable => try { - mapPartitionsWithIndexInternalV1(rdd, internalRowToUnsafeRowWithIndex) + reflectMapPartitionsWithIndexInternalV1(rdd, internalRowToUnsafeRowWithIndex) } catch { case _: Throwable => throw ScalaReflectionException( @@ -80,50 +83,74 @@ object ReflectionUtil { } } } + + invokeMapPartitionsWithIndexInternal(version, method, rdd, internalRowToUnsafeRowWithIndex) + } } // Spark-2.3.0 & Spark-2.3.1 - private def mapPartitionsWithIndexInternalV1( + private def reflectMapPartitionsWithIndexInternalV1( rdd: RDD[InternalRow], internalRowToUnsafeRowWithIndex: (Int, Iterator[InternalRow]) => Iterator[UnsafeRow] - ): RDD[InternalRow] = { - val method = classOf[RDD[InternalRow]].getDeclaredMethod( - "mapPartitionsWithIndexInternal", - classOf[(Int, Iterator[InternalRow]) => Iterator[UnsafeRow]], - classOf[Boolean], - classOf[ClassTag[UnsafeRow]] - ) - method - .invoke( - rdd, - internalRowToUnsafeRowWithIndex, - Boolean.box(false), - ClassTag.apply(classOf[UnsafeRow]) + ): (String, Method) = { + ( + "v1", + classOf[RDD[InternalRow]].getDeclaredMethod( + "mapPartitionsWithIndexInternal", + classOf[(Int, Iterator[InternalRow]) => Iterator[UnsafeRow]], + classOf[Boolean], + classOf[ClassTag[UnsafeRow]] ) - .asInstanceOf[RDD[InternalRow]] - + ) } // >= Spark-2.3.2 - private def mapPartitionsWithIndexInternalV2( + private def reflectMapPartitionsWithIndexInternalV2( rdd: RDD[InternalRow], internalRowToUnsafeRowWithIndex: (Int, Iterator[InternalRow]) => Iterator[UnsafeRow] - ): RDD[InternalRow] = { - val method = classOf[RDD[InternalRow]].getDeclaredMethod( - "mapPartitionsWithIndexInternal", - classOf[(Int, Iterator[InternalRow]) => Iterator[UnsafeRow]], - classOf[Boolean], - classOf[Boolean], - classOf[ClassTag[UnsafeRow]] - ) - method - .invoke( - rdd, - internalRowToUnsafeRowWithIndex, - Boolean.box(false), - Boolean.box(false), - ClassTag.apply(classOf[UnsafeRow]) + ): (String, Method) = { + ( + "v2", + classOf[RDD[InternalRow]].getDeclaredMethod( + "mapPartitionsWithIndexInternal", + classOf[(Int, Iterator[InternalRow]) => Iterator[UnsafeRow]], + classOf[Boolean], + classOf[Boolean], + classOf[ClassTag[UnsafeRow]] ) - .asInstanceOf[RDD[InternalRow]] + ) + } + + private def invokeMapPartitionsWithIndexInternal( + version: String, + method: Method, + rdd: RDD[InternalRow], + internalRowToUnsafeRowWithIndex: (Int, Iterator[InternalRow]) => Iterator[UnsafeRow] + ): RDD[InternalRow] = { + version match { + case "v1" => + // Spark-2.3.0 & Spark-2.3.1 + method + .invoke( + rdd, + internalRowToUnsafeRowWithIndex, + Boolean.box(false), + ClassTag.apply(classOf[UnsafeRow]) + ) + .asInstanceOf[RDD[InternalRow]] + + case _ => + // >= Spark-2.3.2 + method + .invoke( + rdd, + internalRowToUnsafeRowWithIndex, + Boolean.box(false), + Boolean.box(false), + ClassTag.apply(classOf[UnsafeRow]) + ) + .asInstanceOf[RDD[InternalRow]] + } + } }