Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PySpark performance bottlenecks: counting values #130

Closed
ianmilligan1 opened this issue Nov 30, 2017 · 10 comments
Closed

PySpark performance bottlenecks: counting values #130

ianmilligan1 opened this issue Nov 30, 2017 · 10 comments

Comments

@ianmilligan1
Copy link
Member

One of the core scripts that we use does the following:

  • takes a WARC or collection of WARCs;
  • extracts the hyperlinks, noting where they're from, where they're pointing from, and when the snapshot was taken;
  • reduces to domains (i.e. liberal.ca/smith becomes liberal.ca) and cleans up cruft (so that www.liberal.ca and liberal.ca get folded into the same);
  • assembles it together so you get a (SOURCE, TARGET, NUMBER OF LINKS) output for a given year, year-month, or year-month-day. i.e. if the Liberal Party of Canada linked out to the Green Party of Canada;
  • and provides nice output.

Here's the PySpark script:

import RecordLoader
from DFTransformations import *
from ExtractDomain import ExtractDomain
from ExtractLinks import ExtractLinks
from pyspark.sql import SparkSession
import re

path = "src/test/resources/arc/example.arc.gz"
spark = SparkSession.builder.appName("siteLinkStructureByDate").getOrCreate()
sc = spark.sparkContext


df = RecordLoader.loadArchivesAsDF(path, sc, spark)
fdf = df.select(df['crawlDate'], df['url'], df['contentString'])
rdd = fdf.rdd
rddx = rdd.map (lambda r: (r.crawlDate, ExtractLinks(r.url, r.contentString)))\
 .flatMap(lambda r: map(lambda f: (r[0], ExtractDomain(f[0]), ExtractDomain(f[1])), r[1]))\
 .filter(lambda r: r[-1] != None)\
 .map(lambda r: (r[0], re.sub(r'^.*www.', '', r[1]), re.sub(r'^.*www.', '', r[2])))\
 .countByValue()

print([((x[0], x[1], x[2]), y) for x, y in rddx.items()]) 

After doing timing, the .countByValue() command accounts for roughly 98% of the time of the calculation, which makes it pretty much useless for any production at scale (i.e. even a few GBs and you're waiting hours).

How can we speed this up?

@ianmilligan1
Copy link
Member Author

Swapping out

print(countItems(rddx).filter(lambda r: r[1] > 5).take(10)) for countByValue() produces roughly the same timings.

@greebie
Copy link
Contributor

greebie commented Nov 30, 2017

Note that countByValue() is a method of the PySpark RDD class that creates a default_dict() out of the entries. It also requires the setting of PYTHONSEED=x to ensure that the smooth running across the cores.

If there is a generator function that could perform this better, that would be great. It's possible that using DF counts instead would help. Spark suggests using ReduceByKey.

Also for reasons I do not quite understand, the product of the map is a list of pyspark.sql row objects. That again, suggests that we (royal we, so I) should learn to use dataframes instead.

@ianmilligan1
Copy link
Member Author

Today is one of those "working hard and not achieving anything" days.

But as you'll see here, I've tried to implement reduceByKey to no avail. Timings simply aren't improving. Results are identical though, both in terms of output and time (still ~ 180 seconds).

df = RecordLoader.loadArchivesAsDF(path, sc, spark)
fdf = df.select(df['crawlDate'], df['url'], df['contentString'])
rdd = fdf.rdd
rddx = rdd.map (lambda r: (r.crawlDate, ExtractLinks(r.url, r.contentString)))\
 .flatMap(lambda r: map(lambda f: (r[0], ExtractDomain(f[0]), ExtractDomain(f[1])), r[1]))\
 .filter(lambda r: r[-1] != None)\
 .map(lambda r: (r[0], re.sub(r'^.*www.', '', r[1]), re.sub(r'^.*www.', '', r[2])))\
 .map(lambda x: (x, 1)).reduceByKey(lambda x,y: x+y).collect()

The SQL approach might be a good one to investigate?

@greebie
Copy link
Contributor

greebie commented Dec 3, 2017

Spark Dataframes has a function called .explode() which may be able to duplicate the flatMap call above. If we can stay in the DF, I think things may be able to improve.

@greebie
Copy link
Contributor

greebie commented Jan 11, 2018

I think the ultimate solution is to have the Python code play nicely with our Scala UDFs, but there could be others. Would be better to have someone with stronger programming skills to work on this.

@ianmilligan1
Copy link
Member Author

@dhop is taking this issue on!

@greebie
Copy link
Contributor

greebie commented Feb 7, 2018

@dhop This issue has the scripts for testing performance and the record of past tests we conducted. Might make things a little easier for testing... #121

@greebie greebie closed this as completed Feb 7, 2018
@greebie greebie reopened this Feb 7, 2018
@greebie
Copy link
Contributor

greebie commented Feb 7, 2018

(mea culpa - pressed wrong button)

@ruebot
Copy link
Member

ruebot commented May 2, 2018

Is this issue still relevant with #214 being merged? And, should we also close this in favour of #215?

@lintool
Copy link
Member

lintool commented May 2, 2018

Yup!

@lintool lintool closed this as completed May 2, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants