Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added ParquetCachedBatchSerializer support for Databricks #2880

Merged
merged 10 commits into from
Jul 12, 2021

Conversation

razajafri
Copy link
Collaborator

@razajafri razajafri commented Jul 7, 2021

This PR adds ParquetCachedBatchSerializer support for Databricks and added nightly tests

fixes #2856

razajafri added 2 commits July 6, 2021 16:52
Signed-off-by: Raza Jafri <rjafri@nvidia.com>
Signed-off-by: Raza Jafri <rjafri@nvidia.com>
@razajafri
Copy link
Collaborator Author

@NvTimLiu can you especially take a look at the run-tests.py?

tgravescs
tgravescs previously approved these changes Jul 7, 2021
Signed-off-by: Raza Jafri <rjafri@nvidia.com>
@razajafri
Copy link
Collaborator Author

build

@viadea
Copy link
Collaborator

viadea commented Jul 7, 2021

@razajafri As per tests using the fix jar, GpuInMemoryTableScan is in place.
However when comparing the Databricks' plan vs Standalone's plan, there is an extra GpuColumnarToRow right before InMemoryRelation.

For example,

  1. Databricks plan:
== Physical Plan ==
GpuColumnarToRow false
+- GpuHashAggregate(keys=[], functions=[gpucount(distinct _gen_alias_217#217)]), filters=List(None))
   +- GpuShuffleCoalesce 2147483647
      +- GpuColumnarExchange gpusinglepartitioning$(), ENSURE_REQUIREMENTS, [id=#963]
         +- GpuHashAggregate(keys=[], functions=[partial_gpucount(distinct _gen_alias_217#217)]), filters=List(None))
            +- GpuHashAggregate(keys=[_gen_alias_217#217], functions=[]), filters=List())
               +- GpuShuffleCoalesce 2147483647
                  +- GpuColumnarExchange gpuhashpartitioning(_gen_alias_217#217, 200), ENSURE_REQUIREMENTS, [id=#952]
                     +- GpuHashAggregate(keys=[_gen_alias_217#217], functions=[]), filters=List())
                        +- GpuProject [col#152.name.firstname AS _gen_alias_217#217]
                           +- GpuInMemoryTableScan [col#152]
                                 +- InMemoryRelation [col#152], StorageLevel(disk, memory, deserialized, 1 replicas)
                                       +- GpuColumnarToRow false
                                          +- GpuProject [named_struct(name, name#57, newname, named_struct(firstname, name#57.firstname, lastname, name#57.lastname)) AS col#152]
                                             +- GpuFileGpuScan parquet [name#57] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/tmp/testparquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<name:struct<firstname:string,middlename:string,lastname:string>
  1. Standalone cluster's plan:
== Physical Plan ==
GpuColumnarToRowTransition false
+- GpuHashAggregate(keys=[], functions=[gpucount(distinct _gen_alias_117#117)]), filters=List(None))
   +- GpuShuffleCoalesce 2147483647
      +- GpuColumnarExchange gpusinglepartitioning$(), ENSURE_REQUIREMENTS, [id=#266]
         +- GpuHashAggregate(keys=[], functions=[partial_gpucount(distinct _gen_alias_117#117)]), filters=List(None))
            +- GpuHashAggregate(keys=[_gen_alias_117#117], functions=[]), filters=List())
               +- GpuShuffleCoalesce 2147483647
                  +- GpuColumnarExchange gpuhashpartitioning(_gen_alias_117#117, 200), ENSURE_REQUIREMENTS, [id=#255]
                     +- GpuHashAggregate(keys=[_gen_alias_117#117], functions=[]), filters=List())
                        +- GpuProject [col#62.name.firstname AS _gen_alias_117#117]
                           +- GpuInMemoryTableScan [col#62]
                                 +- InMemoryRelation [col#62], StorageLevel(disk, memory, deserialized, 1 replicas)
                                       +- GpuProject [named_struct(name, name#16, newname, named_struct(firstname, name#16.firstname, lastname, name#16.lastname)) AS col#62]
                                          +- GpuFileGpuScan parquet [name#16] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/testparquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<name:struct<firstname:string,middlename:string,lastname:string>>

Should we worry about that extra GpuColumnarToRow?

@razajafri
Copy link
Collaborator Author

@razajafri As per tests using the fix jar, GpuInMemoryTableScan is in place.
However when comparing the Databricks' plan vs Standalone's plan, there is an extra GpuColumnarToRow right before InMemoryRelation.

For example,

  1. Databricks plan:
== Physical Plan ==
GpuColumnarToRow false
+- GpuHashAggregate(keys=[], functions=[gpucount(distinct _gen_alias_217#217)]), filters=List(None))
   +- GpuShuffleCoalesce 2147483647
      +- GpuColumnarExchange gpusinglepartitioning$(), ENSURE_REQUIREMENTS, [id=#963]
         +- GpuHashAggregate(keys=[], functions=[partial_gpucount(distinct _gen_alias_217#217)]), filters=List(None))
            +- GpuHashAggregate(keys=[_gen_alias_217#217], functions=[]), filters=List())
               +- GpuShuffleCoalesce 2147483647
                  +- GpuColumnarExchange gpuhashpartitioning(_gen_alias_217#217, 200), ENSURE_REQUIREMENTS, [id=#952]
                     +- GpuHashAggregate(keys=[_gen_alias_217#217], functions=[]), filters=List())
                        +- GpuProject [col#152.name.firstname AS _gen_alias_217#217]
                           +- GpuInMemoryTableScan [col#152]
                                 +- InMemoryRelation [col#152], StorageLevel(disk, memory, deserialized, 1 replicas)
                                       +- GpuColumnarToRow false
                                          +- GpuProject [named_struct(name, name#57, newname, named_struct(firstname, name#57.firstname, lastname, name#57.lastname)) AS col#152]
                                             +- GpuFileGpuScan parquet [name#57] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/tmp/testparquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<name:struct<firstname:string,middlename:string,lastname:string>
  1. Standalone cluster's plan:
== Physical Plan ==
GpuColumnarToRowTransition false
+- GpuHashAggregate(keys=[], functions=[gpucount(distinct _gen_alias_117#117)]), filters=List(None))
   +- GpuShuffleCoalesce 2147483647
      +- GpuColumnarExchange gpusinglepartitioning$(), ENSURE_REQUIREMENTS, [id=#266]
         +- GpuHashAggregate(keys=[], functions=[partial_gpucount(distinct _gen_alias_117#117)]), filters=List(None))
            +- GpuHashAggregate(keys=[_gen_alias_117#117], functions=[]), filters=List())
               +- GpuShuffleCoalesce 2147483647
                  +- GpuColumnarExchange gpuhashpartitioning(_gen_alias_117#117, 200), ENSURE_REQUIREMENTS, [id=#255]
                     +- GpuHashAggregate(keys=[_gen_alias_117#117], functions=[]), filters=List())
                        +- GpuProject [col#62.name.firstname AS _gen_alias_117#117]
                           +- GpuInMemoryTableScan [col#62]
                                 +- InMemoryRelation [col#62], StorageLevel(disk, memory, deserialized, 1 replicas)
                                       +- GpuProject [named_struct(name, name#16, newname, named_struct(firstname, name#16.firstname, lastname, name#16.lastname)) AS col#62]
                                          +- GpuFileGpuScan parquet [name#16] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/testparquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<name:struct<firstname:string,middlename:string,lastname:string>>

Should we worry about that extra GpuColumnarToRow?

@viadea This is definitely something that I will look into. Can we file an issue for it?

There is a mismatch of param numbers in the Databricks test script,
when running Databricks nightly test job or nightly build job.

The test job takes 1 to 3 params, while the build job requires only 1 to 2 params.
This makes it hard to match up params between the test job and the build jobs.

To fix the issue, we explicitly export vars for the test script instead of taking shell params.
This method also makes it easier to extend vars/params for the test script.

Signed-off-by: Tim Liu <timl@nvidia.com>
@razajafri
Copy link
Collaborator Author

razajafri commented Jul 8, 2021

LGTM, I can't approve because I am the author, @tgravescs will have to do the honors if he thinks this is good.

@tgravescs are we intentionally not calling super.getExecs + at the start of getExec of this class? If we call super.getExec we will get all the execs from 311 and the InMemoryTableScanExec change will not be necessary.

@tgravescs
Copy link
Collaborator

tgravescs commented Jul 8, 2021

yes it was intentional, I'd have to go back and verify but I don't think there was any overlap, I think data bricks override everything with its own version so didn't do any good.

@jlowe
Copy link
Member

jlowe commented Jul 8, 2021

I think data bricks override everything with its own version so didn't do any good.

Even if Databricks overrode everything, wouldn't we want to still try to pick up the base version in case a new shim for an exec appears that we can just reuse in the future?

@tgravescs
Copy link
Collaborator

sure, then we I think can remove the ParquetCachedBatchSerializer spark311db version all together.

@razajafri
Copy link
Collaborator Author

sure, then we I think can remove the ParquetCachedBatchSerializer spark311db version all together.

OK, I will push another update to this PR where we will get everything from the parent just like in all other shims

Signed-off-by: Raza Jafri <rjafri@nvidia.com>
@razajafri
Copy link
Collaborator Author

build

@razajafri razajafri requested a review from tgravescs July 9, 2021 16:07
jlowe
jlowe previously approved these changes Jul 9, 2021
Copy link
Member

@jlowe jlowe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks OK to me, assuming this was tested on Databricks.

@sameerz sameerz added the feature request New feature or request label Jul 9, 2021

import com.nvidia.spark.rapids.shims

class ParquetCachedBatchSerializer extends shims.spark311.ParquetCachedBatchSerializer {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this file shouldn't be needed now right? the 311 shim version of this works.
Or is the intention to keep the db one so that the user specifies this one? https://nvidia.github.io/spark-rapids/docs/additional-functionality/cache-serializer.html. If that is the case we need to update the docs.

originally the intention was that since the user has to specify it have it match the version of spark they are using so that its hopefully least confusing. I don't know how much it betters if they specify spark311 vs spark311db

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if its possible it would be nice to have one class that just loaded the proper shim version, but that is a separate issue.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this file shouldn't be needed now right? the 311 shim version of this works.
Or is the intention to keep the db one so that the user specifies this one? https://nvidia.github.io/spark-rapids/docs/additional-functionality/cache-serializer.html. If that is the case we need to update the docs.

originally the intention was that since the user has to specify it have it match the version of spark they are using so that its hopefully least confusing. I don't know how much it betters if they specify spark311 vs spark311db

If we add spark311db to the documentation we will then also have to add spark311cdh. I almost feel like they are all spark 311 and we aren't doing anything specific in the extended version of their PCBS we should just get rid of them.

if its possible it would be nice to have one class that just loaded the proper shim version, but that is a separate issue.

This is a good idea, I can look into it as a follow-on

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jlowe thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if its possible it would be nice to have one class that just loaded the proper shim version, but that is a separate issue.

☝️ This. We should not have Spark-specific versions of user-visible classes unless they are truly required (e.g.: as in the shuffle case, unfortunately). If we know one class will work going forward, as is the case with the main executor plugin, then we should strive to use a common class name without a Spark version in it. If this is indeed possible, we should deprecate the old 311 version and eventually remove it.

So it really all comes down to that question. If we can have a common version, my vote is to use the one class. We can change the package name and deprecate the existing spark311 package version in a new PR if it's too tricky to do in this one.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I can do this as a follow-on. In the interim, do we just update the doc or remove the spark311db or spark311cdh versions of the serializer? I feel removing the db and cdh versions of the serializer should be the way as we will do more work as part of the follow-on

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're planning on removing these classes in the near future then we should not document them only to rip them out immediately afterward. Let's keep the number of classes to deprecate to a minimum.

jlowe
jlowe previously approved these changes Jul 9, 2021
Copy link
Member

@jlowe jlowe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks OK to me assuming the other Spark-specific cache serializers (e.g.: spark312) will be removed in a followup.

Signed-off-by: Raza Jafri <rjafri@nvidia.com>
@razajafri
Copy link
Collaborator Author

build

Signed-off-by: Raza Jafri <rjafri@nvidia.com>
@razajafri
Copy link
Collaborator Author

build

@razajafri razajafri requested a review from tgravescs July 12, 2021 17:35
tgravescs
tgravescs previously approved these changes Jul 12, 2021
Copy link
Member

@jlowe jlowe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like some more unneeded imports in the Databricks shim.

…spark311db/Spark311dbShims.scala

Co-authored-by: Jason Lowe <jlowe@nvidia.com>
…spark311db/Spark311dbShims.scala

Co-authored-by: Jason Lowe <jlowe@nvidia.com>
@razajafri
Copy link
Collaborator Author

build

@razajafri razajafri merged commit 434b911 into NVIDIA:branch-21.08 Jul 12, 2021
@razajafri razajafri deleted the db_cache branch July 15, 2021 01:17
@pxLi
Copy link
Collaborator

pxLi commented Jul 24, 2021

#3016, and made a fix at #3018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[BUG] cache of struct does not work on databricks 8.2ML
7 participants