Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dedupe proxy rapids shuffle manager byte code #3602

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dist/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
layout: page
title: Testing
title: RAPIDS Accelerator for Apache Spark Distribution Packaging
nav_order: 1
parent: Developer Overview
---
Expand Down
123 changes: 123 additions & 0 deletions shims/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
---
layout: page
title: Shim Development
nav_order: 1
parent: Developer Overview
---
# Shim Development

RAPIDS Accelerator For Apache Spark supports multiple feature version lines of
Apache Spark such as 3.1.x, 3.2.x, and a number of vendor releases that contain
a mix of patches from different upstream releases. These artifacts are generally
incompatible between each other, at both source code level and even more often
at the binary level. The role of the Shim layer is to hide these issues from the
common code, maximize reuse, and minimize logic duplication.

This is achieved by using a ServiceProvider pattern. All Shims implement the same API,
the suitable Shim implementation is loaded after detecting the current Spark build version
attempting to instantiate our plugin. We use the
[ShimLoader](../sql-plugin/src/main/scala/com/nvidia/spark/rapids/ShimLoader.scala)
class as a tight entry point for interacting with the host Spark runtime.

In the following we provide recipes for typical scenarios addressed by the Shim layer.

## Method signature discrepancies

It's among the easiest issues to resolve. We define a method in SparkShims
trait covering a superset of parameters from all versions and call it
```
ShimLoader.gerSparkShims.methodWithDiscrepancies(p_1, ..., p_n)
```
instead of referencing it directly. Shim implementations are in charge of dispatching it further
to correct version-dependent methods. Moreover, unlike in the below sections
conflicts between versions are easily avoided by using different package or class names
for conflicting Shim implementations.

## Base Classes/Traits Changes

### Compile-time issues
Upstream base classes we derive from might be incompatible in the sense that one version
requires us to implement/override the method `M` whereas the other prohibits it by marking
the base implementation `final`, E.g. `org.apache.spark.sql.catalyst.trees.TreeNode` changes
between Spark 3.1.x and Spark 3.2.x. So instead of deriving from such classes directly we
inject an intermediate trait e.g. `com.nvidia.spark.rapids.shims.v2.ShimExpression` that
has a varying source code depending on the Spark version we compile against to overcome this
issue as you can see e.g., comparing TreeNode:
1. [ShimExpression For 3.0.x and 3.1.x](../sql-plugin/src/main/301until320-all/scala/com/nvidia/spark/rapids/shims/v2/TreeNode.scala#L23)
2. [ShimExpression For 3.2.x](../sql-plugin/src/main/301until320-all/scala/com/nvidia/spark/rapids/shims/v2/TreeNode.scala#L23)

This resolves compile-time problems, however, now we face the problem at run time.

### Run-time Issues

Plugin has to be able to deterministically load the right class files
for identically named classes depending on the detected
Spark runtime version. This is typically solved by using ASM-based relocation in the bytecode,
however it does not work easily with
[Scala packages](https://contributors.scala-lang.org/t/scala-signature-layout/3327/4)

So instead we resort to the idea of JDK's ParallelWorldClassloader in combination with the fact that
Spark runtime uses mutable classloaders we can alter after detecting the runtime version.
Using JarURLConnection URLs we create a Parallel World of the current version within the jar, e.g.:

Spark 3.0.2's URLs:
```
jar:file:/home/spark/rapids-4-spark_2.12-21.10.0.jar!/
jar:file:/home/spark/rapids-4-spark_2.12-21.10.0.jar!/spark3xx-common/
jar:file:/home/spark/rapids-4-spark_2.12-21.10.0.jar!/spark302/
```

Spark 3.2.0's URLs :
```
jar:file:/home/spark/rapids-4-spark_2.12-21.10.0.jar!/
jar:file:/home/spark/rapids-4-spark_2.12-21.10.0.jar!/spark3xx-common/
jar:file:/home/spark/rapids-4-spark_2.12-21.10.0.jar!/spark320/
```

### Late Inheritance in Public Classes

Most classes needed by the plugin can be disambiguated using Parallel World locations without
reservations except for documented classes that are exposed to the user that may be loaded before
the Plugin is even instantiated by the Spark runtime. The most important example of such a class
is a configurable ShuffleManager. `ShuffleManager` has also changed in a backwards incompatible
manner over the span of supported Spark versions.

The first issue with such a class, since it's loaded by Spark directly outside our control we
cannot have a single class name for our implementation that would work across versions. This is resolved,
by having the documented facade classes with a shim specifier in their package names.

The second issue that every parent class/trait in the inheritance graph is loaded using the classloader outside
Plugin's control. Therefore, all this bytecode must reside in the conventional jar location, and it must
be bitwise-identical across *all* shims. The only way to keep the source code for shared functionality unduplicated,
(i.e., in `sql-plugin/src/main/scala` as opposed to be duplicated either in `shims/spark3xx` submodules or over
`sql-plugin/src/main/3*/scala` source code roots) is to delay inheriting `ShuffleManager` until as late as possible,
as close as possible to the facade class where we have to split the source code anyways. Use traits as much
as possible for flexibility.

### Late Initialization of Public Classes' Ancestors

The third issue may arise from the fact that the shared logic may transitively reference a class that
for one another reason resides in a Parallel World. Untangling this is tedious and may be unnecessary.
The following approach robustly prevents from running into issues related to that.

We know that at the time such a class is loaded by Spark it's not strictly needed if the Plugin
has not been loaded yet. More accurately, it may not be strictly needed until later when the first
query can be run when the Spark SQL session and its extensions are initialized. It improves the
user experience if the first query is not penalized beyond necessary though. By design, Plugin guarantees
that the classloader is
[set up at load time](../sql-plugin/src/main/scala/com/nvidia/spark/SQLPlugin.scala#L29)
before the DriverPlugin and ExecutorPlugin instances are called the `init` method on.

By making a visible class merely a wrapper of the real implementation, extending `scala.Proxy` where `self` is a lazy
val, we prevent classes from Parallel Worlds to be loaded before they can be, and are actually required.
For examples see:

1. `abstract class ProxyRapidsShuffleInternalManagerBase`
2. `class ExclusiveModeGpuDiscoveryPlugin`

Note that we currently have to manually code up the delegation methods to the tune of:
```
def method(x: SomeThing) = self.method(x)
```
This could be automatically generated with a simple tool processing the `scalap` output or Scala macros at
build/compile time. Pull requests are welcome.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class RapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean)
}

class ProxyRapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean)
extends ProxyRapidsShuffleInternalManagerBase(conf, isDriver) {
extends ProxyRapidsShuffleInternalManagerBase(conf, isDriver) with ShuffleManager {

override def getReader[K, C](
handle: ShuffleHandle,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class RapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean)
}

class ProxyRapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean)
extends ProxyRapidsShuffleInternalManagerBase(conf, isDriver) {
extends ProxyRapidsShuffleInternalManagerBase(conf, isDriver) with ShuffleManager {

override def getReaderForRange[K, C](
handle: ShuffleHandle,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ class RapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean)
}

class ProxyRapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean)
extends ProxyRapidsShuffleInternalManagerBase(conf, isDriver) {
extends ProxyRapidsShuffleInternalManagerBase(conf, isDriver) with ShuffleManager {

override def getReader[K, C](
def getReader[K, C](
handle: ShuffleHandle,
startPartition: Int,
endPartition: Int,
Expand All @@ -64,7 +64,7 @@ class ProxyRapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean)
self.getReader(handle, startPartition, endPartition, context, metrics)
}

override def getReaderForRange[K, C](
def getReaderForRange[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
endMapIndex: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class RapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean)


class ProxyRapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean)
extends ProxyRapidsShuffleInternalManagerBase(conf, isDriver) {
extends ProxyRapidsShuffleInternalManagerBase(conf, isDriver) with ShuffleManager {

override def getReader[K, C](
handle: ShuffleHandle,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class RapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean)


class ProxyRapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean)
extends ProxyRapidsShuffleInternalManagerBase(conf, isDriver) {
extends ProxyRapidsShuffleInternalManagerBase(conf, isDriver) with ShuffleManager {

override def getReader[K, C](
handle: ShuffleHandle,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class RapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean)
}

class ProxyRapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean)
extends ProxyRapidsShuffleInternalManagerBase(conf, isDriver) {
extends ProxyRapidsShuffleInternalManagerBase(conf, isDriver) with ShuffleManager {

def getReader[K, C](
handle: ShuffleHandle,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class RapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean)
}

class ProxyRapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean)
extends ProxyRapidsShuffleInternalManagerBase(conf, isDriver) {
extends ProxyRapidsShuffleInternalManagerBase(conf, isDriver) with ShuffleManager {

def getReader[K, C](
handle: ShuffleHandle,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class RapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean)
}

class ProxyRapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean)
extends ProxyRapidsShuffleInternalManagerBase(conf, isDriver) {
extends ProxyRapidsShuffleInternalManagerBase(conf, isDriver) with ShuffleManager {

def getReader[K, C](
handle: ShuffleHandle,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class RapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean)


class ProxyRapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean)
extends ProxyRapidsShuffleInternalManagerBase(conf, isDriver) {
extends ProxyRapidsShuffleInternalManagerBase(conf, isDriver) with ShuffleManager {

def getReader[K, C](
handle: ShuffleHandle,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class RapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean)


class ProxyRapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean)
extends ProxyRapidsShuffleInternalManagerBase(conf, isDriver) {
extends ProxyRapidsShuffleInternalManagerBase(conf, isDriver) with ShuffleManager {

def getReader[K, C](
handle: ShuffleHandle,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class RapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean)


class ProxyRapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean)
extends ProxyRapidsShuffleInternalManagerBase(conf, isDriver) {
extends ProxyRapidsShuffleInternalManagerBase(conf, isDriver) with ShuffleManager {

def getReader[K, C](
handle: ShuffleHandle,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,33 +419,47 @@ abstract class RapidsShuffleInternalManagerBase(conf: SparkConf, val isDriver: B
}
}

/**
* Trait that makes it easy to check whether we are dealing with the
* a RAPIDS Shuffle Manager
*
* TODO name does not match its function anymore
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you going to fix this TODO?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed #3624 for this.

*/
trait VisibleShuffleManager {
def isDriver: Boolean
def initialize: Unit
}

/**
* A simple proxy wrapper allowing to delay loading of the
* real implementation to a later point when ShimLoader
* has already updated Spark classloaders.
*
* @param conf
* @param isDriver
*/
abstract class ProxyRapidsShuffleInternalManagerBase(
conf: SparkConf,
override val isDriver: Boolean
) extends ShuffleManager with VisibleShuffleManager with Proxy {
) extends VisibleShuffleManager with Proxy {

// touched in the plugin code after the shim initialization
// is complete
override lazy val self: ShuffleManager =
lazy val self: ShuffleManager =
ShimLoader.newInternalShuffleManager(conf, isDriver)
.asInstanceOf[ShuffleManager]

// This function touches the lazy val `self` so we actually instantiate
// the manager. This is called from both the driver and executor.
// In the driver, it's mostly to display information on how to enable/disable the manager,
// in the executor, the UCXShuffleTransport starts and allocates memory at this time.
override def initialize: Unit = self
def initialize: Unit = self

//
// Signatures unchanged since 3.0.1 follow
//

override def getWriter[K, V](
def getWriter[K, V](
handle: ShuffleHandle,
mapId: Long,
context: TaskContext,
Expand All @@ -454,16 +468,16 @@ abstract class ProxyRapidsShuffleInternalManagerBase(
self.getWriter(handle, mapId, context, metrics)
}

override def registerShuffle[K, V, C](
def registerShuffle[K, V, C](
shuffleId: Int,
dependency: ShuffleDependency[K, V, C]
): ShuffleHandle = {
self.registerShuffle(shuffleId, dependency)
}

override def unregisterShuffle(shuffleId: Int): Boolean = self.unregisterShuffle(shuffleId)
def unregisterShuffle(shuffleId: Int): Boolean = self.unregisterShuffle(shuffleId)

override def shuffleBlockResolver: ShuffleBlockResolver = self.shuffleBlockResolver
def shuffleBlockResolver: ShuffleBlockResolver = self.shuffleBlockResolver

override def stop(): Unit = self.stop()
def stop(): Unit = self.stop()
}