Skip to content

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[QST] Question about using UDF to implement operations. #2535

Closed
tregodev opened this issue May 28, 2021 · 6 comments
Closed

[QST] Question about using UDF to implement operations. #2535

tregodev opened this issue May 28, 2021 · 6 comments
Assignees
Labels
question Further information is requested

Comments

@tregodev
Copy link

Hey all!

As a part of my thesis I am doing research on spark-rapids, comparing GPU and CPU processing on biological sequencing, essentially constructing De Bruijn Graphs from a large text file. The part of the code I want to accelerate is fairly simple, with the only complicated operations that are not already implemented that I require being collect_set and zipWithIndex.

Is there any method using UDF to implement these in a GPU accelerated way?

The data I want to use both functions is String, Long , making the code look something like this:

val inFile = spark.read.schema("kmer STRING, source_seq LONG").csv(inPath).toDF
val collectedSourceSeqs = inFile.groupBy("kmer").agg(sort_array(collect_set("source_seq")).as("source_seqs"))
val collectedSets = collectedSourceSeqs.groupBy("source_seqs").agg(collect_set("kmer").as("kmers"))

As far as I can tell, collect_list is supported in windowing, however dropDuplicates is not supported on listTypes, and zipWithIndex is not dataframe supported, however, I have been using a function that mostly transfers the operation to dataframe:

def zipWithIndex(df: DataFrame, offset: Long = 1, indexName: String = "index") = {
    val dfWithPartitionId = df.withColumn("partition_id", spark_partition_id()).withColumn("inc_id", monotonically_increasing_id())
    val partitionOffsets = dfWithPartitionId
      .groupBy("partition_id")
      .agg(count(lit(1)) as "cnt", first("inc_id") as "inc_id")
      .orderBy("partition_id")
      .select(sum("cnt").over(Window.orderBy("partition_id")) - col("cnt") - col("inc_id") + lit(offset) as "cnt" )
      .collect()
      .map(_.getLong(0))
      .toArray
    val pof = udf((partitionId: Int) => partitionOffsets(partitionId))
    dfWithPartitionId
      .withColumn("partition_offset", pof((col("partition_id"))))
      .withColumn(indexName, col("partition_offset") + col("inc_id"))
      .drop("partition_id", "partition_offset", "inc_id")
  }
@tregodev tregodev added ? - Needs Triage Need team to review and classify question Further information is requested labels May 28, 2021
@revans2
Copy link
Collaborator

revans2 commented May 28, 2021

We support UDFs (sort of). If the UDF is really simple, and can be translated into a catalyst expression, then we can do some things with that in your turn it on (very experimental). I don't think what you are doing is something we support yet for translation to catalyst. The other option we have is they you can write your own UDFs either using cuda directly or using the java cudf API. They can give you a lot of control. But it looks like your UDF is really just a join. You have an array mapping partition ids to some other number, and you want to look it up based off of that partition id. That is a join.

As a side note we are working on collect_list and collect_set for aggregations. Probably a few more releases before we can support it in spark, but the cudf code does support it (we just cannot do it distributed). Sort array is another one that we do not officially support, but the latest cudf does (no java APIs for it yet though). cudf also does not support grouping by lists of things yet. So there is a lot of work to get this functioning. I'll see what I can come up with though.

@tregodev
Copy link
Author

Thank you for the quick and detailed response!

How about collect_set in Windowing, as I am under the impression the cudf library supports collect_set in its java api?

@revans2
Copy link
Collaborator

revans2 commented May 28, 2021

Cudf just did a core freeze for our next release, and we will be doing our own code freeze shortly. So remembering what is in previous releases gets to be a bit complicated. https://nvidia.github.io/spark-rapids/docs/supported_ops.html should list all of the operations for the current release on Apache Spark 3.0.0.

collect_set is not supported for window operations in our current release 0.5. collect_list is. collect_set was added in the cudf release that just froze, so I am hopeful that it will be available for window operations in our upcoming release 21.06.0 (we are moving to calendar versioning). But there is no PR up for it yet, so it might not make it in by the code freeze this Friday. For collect_set and collect_list to work the way Apache Spark wants them to we need an group by aggregation to concat lists and sets. But that didn't make it into cudf for the code freeze so it will probably be a few releases before we can support it.

@jlowe I don't think we support UDAFs yet for RapidsUDFs. Do we?

@jlowe
Copy link
Member

jlowe commented May 28, 2021

@revans2 correct, UDAFs are not yet supported.

@sameerz sameerz removed the ? - Needs Triage Need team to review and classify label Jun 1, 2021
@sameerz
Copy link
Collaborator

sameerz commented Jul 16, 2021

collect_set for windowing will be supported in the upcoming 21.08 release. The overarching issue for collect aggregations is #2062 . The collect_set for windowing PR is #2548.

@jlowe
Copy link
Member

jlowe commented Jul 16, 2021

Closing this as answered. Feel free to reopen if there's more to discuss.

@jlowe jlowe closed this as completed Jul 16, 2021
@NVIDIA NVIDIA locked and limited conversation to collaborators Apr 27, 2022
@sameerz sameerz converted this issue into discussion #5343 Apr 27, 2022

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

4 participants