Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24256][SQL] SPARK-24256: ExpressionEncoder should support user-defined types as fields of Scala case class and tuple #21310

Closed
wants to merge 1 commit into from

Conversation

fangshil
Copy link

@fangshil fangshil commented May 12, 2018

What changes were proposed in this pull request?

Right now, ExpressionEncoder supports ser/de of primitive types, as well as scala case class, tuple and java bean class. Spark's Dataset natively supports these mentioned types, but we find Dataset is not flexible for other user-defined types and encoders.

For example, spark-avro has an AvroEncoder for ser/de Avro types in Dataset. Although we can use AvroEncoder to define Dataset with types being the Avro Generic or Specific Record, using such Avro typed Dataset has many limitations:

  1. We can not use joinWith on this Dataset since the result is a tuple, but Avro types cannot be the field of this tuple.
  2. We can not use some type-safe aggregation methods on this Dataset, such as KeyValueGroupedDataset's reduceGroups, since the result is also a tuple.
  3. We cannot augment an Avro SpecificRecord with additional primitive fields together in a case class, which we find is a very common use case.

The limitation is that ExpressionEncoder does not support serde of Scala case class/tuple with subfields being any other user-defined type with its own Encoder for serde.

To address this issue, we propose a trait as a contract(between ExpressionEncoder and any other user-defined Encoder) to enable case class/tuple/java bean to support user-defined types.

With this proposed patch and our minor modification in AvroEncoder, we remove above-mentioned limitations with cluster-default conf spark.expressionencoder.org.apache.avro.specific.SpecificRecord = com.databricks.spark.avro.AvroEncoder$

This is a patch we have implemented internally and has been used for a few quarters. We want to propose to upstream as we think this is a useful feature to make Dataset more flexible to user types.

How was this patch tested?

I have tested this patch internally. Did not write unit test since the user-defined Encoder(AvroEncoder) is defined outside Spark.
I look for suggestions on how to write unit tests for this patch.

@maropu
Copy link
Member

maropu commented May 14, 2018

You need to add tests first. Could you?

@fangshil
Copy link
Author

I will investigate how can we add test for this. thoughts are welcomed

@fangshil
Copy link
Author

@viirya @cloud-fan before I add test, could you guys take a look and advise if the approach taken in this patch is acceptable?

@viirya
Copy link
Member

viirya commented May 15, 2018

I will take a look later today.

@viirya
Copy link
Member

viirya commented May 21, 2018

I'm not sure if I look into correct project. But seems spark-avro project doesn't have AvroEncoder yet. The PR going to add AvroEncoder looks like this one databricks/spark-avro#217.

@fangshil
Copy link
Author

@viirya thanks for the feedback. We internally customized the AvroEncoder based on the open source PR, since it never gets merged into spark-avro. we propose this feature since it should apply to every user-defined Encoder, not limited to AvroEncoder.

@HyukjinKwon
Copy link
Member

@fangshil, Avro was now in Spark. How does it relate to this PR? Should we go forward?

@HyukjinKwon
Copy link
Member

@fangshil also can you point me out the PR not merged into spark-avro please so that I can check when I have some time.

@fangshil
Copy link
Author

@HyukjinKwon thanks for the update. What do you mean by "Avro was now in Spark"? The PR I mentioned is https://github.com/databricks/spark-avro/pull/215/files. I have been maintaining this PR internally for a while in my company with Spark 2.3

@cloud-fan
Copy link
Contributor

I think this PR is blocked by adding UDT officially(it's currently internal). Maybe we can start a thread about UDT in the dev list.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@fangshil
Copy link
Author

fangshil commented Sep 5, 2018

To summarize our discussion in this pr:
Spark-avro is now merged into Spark as a built-in data source. Upstream community is not merging the AvroEncoder to support Avro types in Dataset, instead, the plan is to exposing the user-defined type API to support defining arbitrary user types in Dataset.

The purpose of this patch is to enable ExpressionEncoder to work together with other types of Encoders, while it seems like upstream prefers to go with UDT. Given this, we can close this PR and we will start the discussion on UDT in another channel

@fangshil fangshil closed this Sep 5, 2018
@benmccann
Copy link
Contributor

There's PR pending to add support for Dataset of Avro here: #22878

Copy link

@kunkun-tang kunkun-tang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/**
* Creates an encoder for Java Bean of type T.
*
* T must be publicly accessible.
*
* supported types for java bean field:
* - primitive types: boolean, int, double, etc.
* - boxed types: Boolean, Integer, Double, etc.
* - String
* - java.math.BigDecimal, java.math.BigInteger
* - time related: java.sql.Date, java.sql.Timestamp
* - collection types: only array and java.util.List currently, map support is in progress
* - nested java bean.
*
* @since 1.6.0
*/
def bean[T](beanClass: Class[T]): Encoder[T] = ExpressionEncoder.javaBean(beanClass)

If we adopt this enhancement, the related documentation might update as well? @fangshil

@coleleahy
Copy link

As @fangshil points out, due to the fact that Spark's encoder-generating facilities found in ScalaReflection and JavaTypeInference cannot be made aware of a user-defined Encoder[T], it is fairly inconvenient to work with a Dataset[T] for which such an encoder has been defined. He mentions two reasons:

  1. Common operations like joins and aggregations require the ability to encode a Dataset[(T, S)] or the like, which Spark will not recognize how to encode -- precisely because the encoder-generating facilities in ScalaReflection cannot see the custom user-defined Encoder[T].

  2. The perfectly reasonable desire to create a case class or Java bean containing a member of type T is thwarted, again because the encoder-generating facilities in ScalaReflection and JavaTypeInference cannot see the custom Encoder[T].

Now, the first problem can perhaps be worked around, for example by implicitly defining an Encoder[(T, S)] whenever there is an implicit Encoder[T] and Encoder[S]. However, the second problem remains. And that is precisely what the present PR sets out to solve.

I understand if the Spark community would prefer to take another approach to solving this problem, but then I'd like to find out what that approach is.

For instance, is the consensus that the best approach is to create a UserDefinedType[T] and register it through the currently private UDTRegistration API? If so, could someone please point me to a thread in the Spark dev list that can shed light on the justification behind this choice, and on the timeline for making that API public?

Finally, I'd like to ask why, even if the UserDefinedType[T] approach is preferred, the work in the present PR isn't being considered as a supplementary enhancement -- one which many Spark users would find very convenient.

@coleleahy
Copy link

coleleahy commented Aug 9, 2019

FWIW I am aware of this thread in the Spark dev list as well as this Jira issue. Looking for information beyond what's included there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants