Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch from Java to Scala Spark API #57

Open
mboes opened this issue Nov 13, 2016 · 16 comments
Open

Switch from Java to Scala Spark API #57

mboes opened this issue Nov 13, 2016 · 16 comments

Comments

@mboes
Copy link
Member

mboes commented Nov 13, 2016

We currently bind the Java API for Spark. @alpmestan likely remembers the rationale better than me. I assume it was mostly a choice by default, and because the Java API was more straightforward to bind, because it doesn't refer to "non-standard" language specific types such as scala.Tuple2 and scala.Function1. And more importantly, because the Java API doesn't expose Scala-specific implicit arguments in function signatures, when these implicits become entirely explicit like any other arguments when calling said function from other JVM languages.

However, there are downsides to the Java API:

  • The Java API features a proliferation of interfaces for function-like objects. Counter-intuitively, FlatMapFunction is not a subtype of Function. There is also ForeachFunction, PairFunction, etc. Since these are unrelated, it means that we can't straightforwardly write a uniform Reflect (Closure (a -> b)) JFun1 instance. The Scala API OTOH only has Function1, Function2 etc.
  • The Scala semantics fit Haskell better:
    ** there are no void methods: all methods return a value.
    ** pairs are packed into tuple objects, the way one would expect in Haskell.
  • the inheritance hierarchy is messy. Whereas in Scala, HadoopRDD < RDD < Object, in Java we have JavaHadoopRDD < JavaPairRDD < Object. Both JavaHadoopRDD and JavaPairRDD both reimplement wrappers that are implementations of the Java-specific JavaRDDLike interface.

If we move to the Scala API, we could tighten the codomain of Uncurry: the Proc code for void methods would no longer be needed. But really the main goal is: all function objects would be treated uniformly according to their arities, hence allowing us to bind RDD.mapPartition, RDD.foreach etc without requiring overlapping instances or newtype wrappers around the closures.

As a side-effect, we'd have slightly less overhead, since we'd be calling into the Scala methods directly, rather than trampolining through their respective Java wrappers first.

So how do we deal with implicits? The only implicits in the API are evidence for Ordering constraints and ClassTag. Our bindings do know at runtime the ground type they're instantiated at the call site, so we can generate ClassTag evidence on demand, inside the binding. Same goes for Ordering evidence. For ClassTag, creating evidence goes something like this:

do klass <- classOf x
    [java| scala.reflect.ClassTag$.MODULE$.apply($klass) |]

This isn't a huge change besides: pretty much only RDD.hs and Closure.hs source files would need to change. The other bindings would stay largely intact.

cc @alpmestan @robinbb @dcoutts

@robinbb
Copy link

robinbb commented Nov 13, 2016

Would this change result in new dependencies?

@mboes
Copy link
Member Author

mboes commented Nov 13, 2016

New Cabal or Java packages? We already depend on scala libs indirectly via Spark (scala is the implementation language for Spark), so I don't think we would have any new dependencies.

@alpmestan
Copy link
Contributor

Indeed, we would probably just make the dependency on scala (and its standard library I guess?) direct as opposed to transitive.

@mboes
Copy link
Member Author

mboes commented Nov 14, 2016 via email

@alpmestan
Copy link
Contributor

@mboes Just a little note on your initial post here before I forget, but it's really a detail more than anything else: it would be nice to avoid using inline-java in sparkle for now for the sake of supporting ghc 7.10.

@mboes
Copy link
Member Author

mboes commented Nov 14, 2016

Didn't mean to imply that inline-java would be required. It was merely a more convenient notation for the sake of discussion than using call and friends.

@mboes mboes changed the title Switch to from Java to Scala Spark API Switch from Java to Scala Spark API Nov 16, 2016
@robinbb
Copy link

robinbb commented Dec 7, 2016

@mboes How much effort is required for this change? Would many more tests be required to verify that this is done correctly?

@robinbb
Copy link

robinbb commented Feb 6, 2017

@mboes @facundominguez Can you give an estimate about how much effort is required for this change?

@mboes
Copy link
Member Author

mboes commented Feb 6, 2017

@robinbb 1-2 days I'd say.

@robinbb
Copy link

robinbb commented Feb 6, 2017

@mboes Thank you for the estimate. Next question: would this create a build-time dependency on Scala that is not now there?

@mboes
Copy link
Member Author

mboes commented Feb 6, 2017

@robinbb it would turn an indirect dependency on the scala standard library (via Spark) into a direct one. So no new dependencies overall.

@robinbb
Copy link

robinbb commented Feb 6, 2017

@mboes Understood. Seems like a worthy change to make.

@edsko
Copy link
Contributor

edsko commented Feb 22, 2017

Per @mboes request, here's a summary of my attempt to add a binding to mapPartitions for dataframes. The Java function has signature

<U> Dataset<U> mapPartitions(MapPartitionsFunction<T,U> f, Encoder<U> encoder)

Since we work with dataframes, we need this instantiated at U = Row. Right off the bat this introduced two challenges: we need to extend the Sparkle Closure module to support this MapPartitionsFunction, and we need to somehow get that Encoder. The former is a not really related to this particular ticket, although @facundominguez , @alpmestan and I had some ideas on how to generalize the Closure stuff and that should probably be described elsewhere; but the latter pertains to the Java/Scala question.

It is my understanding that in Scala this encoder would have been implicit, and nobody needs to worry about it. But in Java world we need to provide it explicitly, and nobody seems to say how. There are some mysterious references to RowEncoders on the internet but the Spark API docs don't say anything about it; the only thing we found was its Scala source code in the Spark github repo. Eventually managed to come up with a standalone Java program that works:

    Encoder<Row> enc   = RowEncoder.apply(ds.schema());
    Dataset<Row> split = ds.mapPartitions(iter -> new Split(iter), enc);

(where ds :: Dataset<Row>). Now binding to this Haskell side was a separate challenge. Initially I tried

newtype RowEncoder = RowEncoder (J ('Class "org.apache.spark.sql.Encoder"))
instance Coercible RowEncoder ('Class "org.apache.spark.sql.Encoder")

getRowEncoder :: StructType -> IO RowEncoder
getRowEncoder st =
    callStatic (sing :: Sing "org.apache.spark.sql.catalyst.encoders.RowEncoder") "apply" [coerce st]

but that yielded the dreaded NoSuchMethodError. After looking at the source code for apply, realize that this actually yields an ExpressionEncoder; yet another undocumented class. So changing this to

newtype RowEncoder = RowEncoder (J ('Class "org.apache.spark.sql.catalyst.encoders.ExpressionEncoder"))
instance Coercible RowEncoder ('Class "org.apache.spark.sql.catalyst.encoders.ExpressionEncoder")

now makes getRowEncoder above work. However, now the call to mapPartitions threw a NoSuchMethodError. Turns out the solution to that was to manually upcast the ExpressionEncoder to an Encoder:

mapPartitionsDF :: Closure (Iterator Row -> Iterator Row)
                -> DataFrame -> IO DataFrame
mapPartitionsDF fun df = do
    RowEncoder enc <- getRowEncoder =<< schema df
    let enc' :: J ('Class "org.apache.spark.sql.Encoder")
        enc' = unsafeCast enc
    jfun <- reflect (HaskellMapPartitionsFunction fun)
    call df "mapPartitions" [coerce jfun, coerce enc']

This now finally works. Well, actually, still getting an exception but I think it's at least finding and invoking the method.

@mboes
Copy link
Member Author

mboes commented Apr 6, 2017

So how do we deal with implicits? The only implicits in the API are evidence for Ordering constraints and ClassTag. Our bindings do know at runtime the ground type they're instantiated at the call site, so we can generate ClassTag evidence on demand, inside the binding. Same goes for Ordering evidence.

I just tested a proof of concept of this idea: https://gist.github.com/mboes/1f31da7e1859371ce5ab74b51397c492

It seems to work!

@edsko
Copy link
Contributor

edsko commented Apr 6, 2017

Note that once we move to the dataset API we'll have a potentially more challenging Implicit to deal with: Encoding.

@mboes
Copy link
Member Author

mboes commented Apr 6, 2017

Do you mean Encoder? It looks like those can be created explicitly using static methods in the Encoders class.

mboes added a commit that referenced this issue Apr 9, 2017
Using inline-java slows down the compilation of sparkle, but is safer
because we can thus get the benefit of *both* type checkers (Java and
Haskell). In fact the extra safety isn't just theoretical: this patch
also includes a fix to the binding for `treeAggregate`, which was
supplying arguments in the wrong order.

This is preliminary work ahead of implementing #57, which we can do
serenely from the moment that the type checkers have our back.

This patch only switches over RDD for now. The rest can come later.
@mboes mboes removed their assignment Jul 15, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants