Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix reflection bug: pass in different arguments for different version of same function #1037

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 97 additions & 68 deletions core/src/main/scala/com/pingcap/tispark/utils/ReflectionUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,85 +43,114 @@ object ReflectionUtil {
// isOrderSensitive: Boolean = false): RDD[U]
//
// Hereby we use reflection to support different Spark versions.
private val mapPartitionsWithIndexInternal: Method = spark_version match {
case "2.3.0" | "2.3.1" =>
tryLoadMethod(
"mapPartitionsWithIndexInternal",
mapPartitionsWithIndexInternalV1,
mapPartitionsWithIndexInternalV2
)
case _ =>
// Spark version >= 2.3.2
tryLoadMethod(
"mapPartitionsWithIndexInternal",
mapPartitionsWithIndexInternalV2,
mapPartitionsWithIndexInternalV1
)
}
case class ReflectionMapPartitionWithIndexInternal(
rdd: RDD[InternalRow],
internalRowToUnsafeRowWithIndex: (Int, Iterator[InternalRow]) => Iterator[UnsafeRow]
) {
// Spark HDP Release may not compatible with official Release
// see https://github.com/pingcap/tispark/issues/1006
def invoke(): RDD[InternalRow] = {
val (version, method) = spark_version match {
case "2.3.0" | "2.3.1" =>
try {
reflectMapPartitionsWithIndexInternalV1(rdd, internalRowToUnsafeRowWithIndex)
} catch {
case _: Throwable =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We catch all exception here. What if the exception thrown by spark is not what we want?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice! fixed

try {
reflectMapPartitionsWithIndexInternalV2(rdd, internalRowToUnsafeRowWithIndex)
} catch {
case _: Throwable =>
throw ScalaReflectionException(
s"Cannot find reflection of Method mapPartitionsWithIndexInternal, current Spark version is %s"
.format(spark_version)
)
}
}

case _ =>
try {
reflectMapPartitionsWithIndexInternalV2(rdd, internalRowToUnsafeRowWithIndex)
} catch {
case _: Throwable =>
try {
reflectMapPartitionsWithIndexInternalV1(rdd, internalRowToUnsafeRowWithIndex)
} catch {
case _: Throwable =>
throw ScalaReflectionException(
s"Cannot find reflection of Method mapPartitionsWithIndexInternal, current Spark version is %s"
.format(spark_version)
)
}
}
}

// Spark HDP Release may not compatible with official Release
// see https://github.com/pingcap/tispark/issues/1006
private def tryLoadMethod(name: String, f1: () => Method, f2: () => Method): Method = {
try {
f1.apply()
} catch {
case _: Throwable =>
try {
f2.apply()
} catch {
case _: Throwable =>
throw ScalaReflectionException(
s"Cannot find reflection of Method $name, current Spark version is %s"
.format(spark_version)
)
}
invokeMapPartitionsWithIndexInternal(version, method, rdd, internalRowToUnsafeRowWithIndex)
}
}

// Spark-2.3.0 & Spark-2.3.1
private def mapPartitionsWithIndexInternalV1(): Method =
classOf[RDD[InternalRow]].getDeclaredMethod(
"mapPartitionsWithIndexInternal",
classOf[(Int, Iterator[InternalRow]) => Iterator[UnsafeRow]],
classOf[Boolean],
classOf[ClassTag[UnsafeRow]]
private def reflectMapPartitionsWithIndexInternalV1(
rdd: RDD[InternalRow],
internalRowToUnsafeRowWithIndex: (Int, Iterator[InternalRow]) => Iterator[UnsafeRow]
): (String, Method) = {
(
"v1",
classOf[RDD[InternalRow]].getDeclaredMethod(
"mapPartitionsWithIndexInternal",
classOf[(Int, Iterator[InternalRow]) => Iterator[UnsafeRow]],
classOf[Boolean],
classOf[ClassTag[UnsafeRow]]
)
)
}

// >= Spark-2.3.2
private def mapPartitionsWithIndexInternalV2(): Method =
classOf[RDD[InternalRow]].getDeclaredMethod(
"mapPartitionsWithIndexInternal",
classOf[(Int, Iterator[InternalRow]) => Iterator[UnsafeRow]],
classOf[Boolean],
classOf[Boolean],
classOf[ClassTag[UnsafeRow]]
private def reflectMapPartitionsWithIndexInternalV2(
rdd: RDD[InternalRow],
internalRowToUnsafeRowWithIndex: (Int, Iterator[InternalRow]) => Iterator[UnsafeRow]
): (String, Method) = {
(
"v2",
classOf[RDD[InternalRow]].getDeclaredMethod(
"mapPartitionsWithIndexInternal",
classOf[(Int, Iterator[InternalRow]) => Iterator[UnsafeRow]],
classOf[Boolean],
classOf[Boolean],
classOf[ClassTag[UnsafeRow]]
)
)
}

case class ReflectionMapPartitionWithIndexInternal(
private def invokeMapPartitionsWithIndexInternal(
version: String,
method: Method,
rdd: RDD[InternalRow],
internalRowToUnsafeRowWithIndex: (Int, Iterator[InternalRow]) => Iterator[UnsafeRow]
) {
def invoke(): RDD[InternalRow] =
spark_version match {
case "2.3.0" | "2.3.1" =>
mapPartitionsWithIndexInternal
.invoke(
rdd,
internalRowToUnsafeRowWithIndex,
Boolean.box(false),
ClassTag.apply(classOf[UnsafeRow])
)
.asInstanceOf[RDD[InternalRow]]
case _ =>
mapPartitionsWithIndexInternal
.invoke(
rdd,
internalRowToUnsafeRowWithIndex,
Boolean.box(false),
Boolean.box(false),
ClassTag.apply(classOf[UnsafeRow])
)
.asInstanceOf[RDD[InternalRow]]
}
): RDD[InternalRow] = {
version match {
case "v1" =>
// Spark-2.3.0 & Spark-2.3.1
method
.invoke(
rdd,
internalRowToUnsafeRowWithIndex,
Boolean.box(false),
ClassTag.apply(classOf[UnsafeRow])
)
.asInstanceOf[RDD[InternalRow]]

case _ =>
// >= Spark-2.3.2
method
.invoke(
rdd,
internalRowToUnsafeRowWithIndex,
Boolean.box(false),
Boolean.box(false),
ClassTag.apply(classOf[UnsafeRow])
)
.asInstanceOf[RDD[InternalRow]]
}

}
}