-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-25789][SQL] Support for Dataset of Avro #22878
Conversation
Test build #98217 has finished for PR 22878 at commit
|
@xuanyuanking . To give credit to @bdrillard correctly, you need to add his commits. Apache Spark community officially recommend to show
|
@dongjoon-hyun Thanks for your comment, let me see how to achieve this, @bdrillard 's commits based on databricks/spark-avro. |
also cc @bdrillard, link this to #21348. |
@xuanyuanking , thanks for the work!
|
I wonder if that can be handled by merge script tho. I think it's okay just to pick up some commits there and rebase them to here even if they become empty commits. That's easier for committers to put his name as primary author when it's merged. |
Just quickly and roughly tested. Merge script looks only recognising main author of each commit in a PR. Let's just push a commit into here. |
Test build #98244 has finished for PR 22878 at commit
|
Thanks @gengliangwang and @HyukjinKwon. Done in this commit. |
Test build #98251 has finished for PR 22878 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's an initial review.
First, I appreciate the efforts of the group here to include me in the commit history of this PR. Given that this is a port of commits from a separate project, I wasn't anticipating that level of commitment, and I'm appreciative of that. Thanks everyone!
To summarize the two main things:
- It might be nice to add a test case over
SpecificRecord
. That would require either generating or importing some Java Avro classes (I link to some I'd made for the Spark-Avro PR in the comment on this topic). - We can do some refactoring of to the
NewInstance
expression to remove the need for separateInitializeAvroObject
andInitializeJavaBean
expressions. That refactor was prepared in [SPARK-22739][Catalyst] Additional Expression Support for Objects #21348 and I describe it more in a comment here, but if it's considered too large a change for the scope of this PR, I'm happy to create a followup PR for it.
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroEncoder.scala
Outdated
Show resolved
Hide resolved
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroEncoder.scala
Outdated
Show resolved
Hide resolved
external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
Outdated
Show resolved
Hide resolved
val ds = rdd.toDS() | ||
assert(ds.count() == genericRecords.size) | ||
context.stop() | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The above tests are all for GenericRecord
Avro classes. It might be good to generate an Avro class having a schema similar to the GenericRecord described above, so that we can test an instance extending SpecificRecord
(which will probably be the most commonly used Avro class for the encoder).
There was one such class in the Spark-Avro project, but I can understand why it may not have been copied over in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, actually I test the cases you mentioned self but its need to add lots of generation code by avro. Moreover, IIUC, the testing of SpecificRecord
just test one more logic of avroClass.getMethod("getClassSchema")
, I just think no need to add those generation code for this test. If we really want to achieve this maybe add a little simple specific record example based on existing test.avsc
? Or if we just want to show the usage, maybe add corresponding document is enought. WDYT :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I'm comfortable with that. The rest of the encoder code path would be the same.
* @param args a sequence of expression pairs that will respectively evaluate to the index of | ||
* the record in which to insert, and the argument value to insert | ||
*/ | ||
case class InitializeAvroObject( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's possible to refactor the NewInstance
expression also in this objects class to support construction of Avro classes, which would eliminate the need for a separate InititalizeAvroObject
. Interestingly, the same refactor would also generalize in such a way as to allow us to remove the need for a separate InitializeJavaBean
expression.
To summarize the change: NewInstance
would accept a Seq
of Expression
for the arguments to the instance's constructor, but also a Seq
of (String, Seq[Expression])
tuples, being an ordered list of setter methods and the methods' respective arguments to call after the object has been constructed.
This covers both construction of Java beans, it covers the construction and instantiation of SpecificRecord
.
See the necessary changes to NewInstance
, here.
Also an additional clause to TreeNode
, here.
And then the changes to JavaTypeInference
, here.
If this refactor is considered a bit too complicated for this PR, we can start with an InitializeAvroObject
and do some cleanup in a followup. As background, a refactor like this was initially suggested by @cloud-fan, see comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, as my comment in #21348 (comment), AFAIK, maybe we can keep 2 pr for convenient review, also there's some refactor work on JavaTypeInference
after #21348, need more advise from Wenchen.
val record = new GenericRecordBuilder(schema).build | ||
val row = expressionEncoder.toRow(record) | ||
val recordFromRow = expressionEncoder.resolveAndBind().fromRow(row) | ||
assert(record.toString == recordFromRow.toString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In order not to let reviewer confuse, add more notes here, after adding map type in this case, record.get(15).equals(recordFromRow.get(15)) is false, this is because key/value in map of record is Utf8
while CharSequence
in recordFromRow, directly call map.equals got false. So here check the result by string.
Avro GenericData.compare():
Test build #98323 has finished for PR 22878 at commit
|
external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
Outdated
Show resolved
Hide resolved
external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
Outdated
Show resolved
Hide resolved
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroEncoder.scala
Show resolved
Hide resolved
Test build #98391 has finished for PR 22878 at commit
|
@gengliangwang thanks for your review on this PR. Do you have any other comments? |
external/avro/src/test/scala/org/apache/spark/sql/avro/AvroEncoderSuite.scala
Outdated
Show resolved
Hide resolved
external/avro/src/test/scala/org/apache/spark/sql/avro/AvroEncoderSuite.scala
Outdated
Show resolved
Hide resolved
retest this please. |
Overall LGTM. cc @cloud-fan |
Test build #100069 has finished for PR 22878 at commit
|
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroEncoder.scala
Show resolved
Hide resolved
9ee695c
to
00cb983
Compare
Test build #100093 has finished for PR 22878 at commit
|
Test build #100094 has started for PR 22878 at commit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
General question: is this based on spark-avro, and/or intended to supersede it?
@srowen IMO, this isn't based on spark-avro, it's more like a supplement for it. |
Test build #116318 has finished for PR 22878 at commit
|
@xuanyuanking gentle ping. |
Co-Authored-By: xuanyuanking <xyliyuanjian@gmail.com>
dfae1b0
to
e59e58c
Compare
Test build #117095 has finished for PR 22878 at commit
|
retest this please. |
Test build #117107 has finished for PR 22878 at commit
|
The failed test |
retest this please |
Test build #117117 has finished for PR 22878 at commit
|
thank you @xuanyuanking is this something we can backport to 2.4.x? Do you see any issues? |
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
Can this be re-opened? Is there someone who can take it through the finish line? Seems like this PR is really close. This is hugely useful to those of us who are working with Kafka + Spark Structured Streaming + Specific Avro types. |
Yes, this looks very interesting and could be beneficial for the community. We may need to handle a few corner cases before merging it but surely can merge. Since this is a new feature, it could be difficult to make a case for backports to any existing release branches. I will take over this work and see it through. |
Feel free to take over by opening your PRs, @anleib . As we know, this PR is ancient one.
|
At LinkedIn we've been using a fork of this PR for many years, and have a number of internal enhancements on top of it. I'm happy to put up a PR to open-source our work, but past conversation on this PR indicate some resistance to bringing it into the Spark project (e.g. this comment) and past dev-list discussions (here and here) haven't generated much interest. @HyukjinKwon -- do you still have general concerns with this work being pulled into Spark? |
@xkrogen To be clear for your request, I don't have any preference for this one actually. So, I don't think I can help you push it through. |
The reason of move is that next-gen DaliSpark reader needs some Spark avro 2.4 feature. Basically moving apache#22878, also fixed some glitches during the move make AvroEncoder could recognize the existing internal sql API. fix a API change RB=1559999 BUG=LIHADOOP-44097 G=superfriends-reviewers R=edlu,fli,yezhou,mshen A=fli
What changes were proposed in this pull request?
Please credit to @bdrillard cause this mainly based on his previous work.
This PR add support for Dataset of Avro records in an API that would allow the user to provide a class to an Encoder for Avro, analogous to the Bean encoder.
ObjectCast
andInitializeAvroObject
(analogous toInitializeJavaBean
) expression.AvroTypeInference
for Avro object and SQL DataType (analogous toJavaTypeInference
).How was this patch tested?
Add UT in AvroSuite.scala and manual test by modified SQLExample with external avro package.