Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bringing Scala DataFrames into PySpark #209

Closed
lintool opened this issue Apr 27, 2018 · 7 comments
Closed

Bringing Scala DataFrames into PySpark #209

lintool opened this issue Apr 27, 2018 · 7 comments

Comments

@lintool
Copy link
Member

lintool commented Apr 27, 2018

@greebie

How to connect the existing DF code in Scala to PySpark:
https://stackoverflow.com/questions/36023860/how-to-use-a-scala-class-inside-pyspark

@lintool
Copy link
Member Author

lintool commented Apr 27, 2018

Got DataFrames working on PyTorch!

pyspark --driver-class-path target/ --jars target/aut-0.16.1-SNAPSHOT-fatjar.jar

Loading various things:

from pyspark.sql import DataFrame
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

jvm = sc._jvm
loader = jvm.io.archivesunleashed.DataFrameLoader(sc._jsc.sc())

def extract_domain(url):
  url = url.replace('http://', '').replace('https://', '')
  if '/' in url:
    return url.split('/')[0].replace('www.', '')
  else:
    return url.replace('www.', '')

sqlContext.udf.register("extract_domain", extract_domain, StringType())
extract_domain_udf = udf(extract_domain, StringType())

Actually running analyses:

pages = DataFrame(loader.extractValidPages("warcs/"), sqlContext)
pages.printSchema()

pages.select(extract_domain_udf("Url").alias("Domain")) \
  .groupBy("Domain").count().orderBy("count", ascending=False).show()

links = DataFrame(loader.extractHyperlinks("warcs/"), sqlContext)

links.printSchema()
links.select(extract_domain_udf("Src").alias("SrcDomain"), extract_domain_udf("Dest").alias("DestDomain")) \
  .groupBy("SrcDomain", "DestDomain").count().orderBy("count", ascending=False).show()

@lintool
Copy link
Member Author

lintool commented Apr 27, 2018

One more thing you need to do in order to get the above work:
https://stackoverflow.com/questions/42942620/spark-2-1-error-while-instantiating-hivesessionstate/43364011

@lintool
Copy link
Member Author

lintool commented May 1, 2018

@ianmilligan1 Check out branch df-pytorch

Start PySpark as follows:

pyspark --driver-class-path target/ --py-files src/main/python/aut.py  --jars target/aut-0.16.1-SNAPSHOT-fatjar.jar

Then you should be able to do the following:

from aut import *

archive = web_archive(sc, sqlContext, "warcs/")

pages = archive.pages()
pages.printSchema()

pages.select(extract_domain("Url").alias("Domain")) \
    .groupBy("Domain").count().orderBy("count", ascending=False).show()

links = archive.links()
links.printSchema()

links.select(extract_domain("Src").alias("SrcDomain"), extract_domain("Dest").alias("DestDomain")) \
    .groupBy("SrcDomain", "DestDomain").count().orderBy("count", ascending=False).show()

I've tried to make it as simple as possible... give it a whirl and let me know!

@ianmilligan1
Copy link
Member

Working like a charm in Jupyter, see screenshot below:

screen shot 2018-05-01 at 7 57 04 am

@lintool
Copy link
Member Author

lintool commented May 1, 2018

Do you like the experience? I have a few more minor tweaks based on more standard Python conventions, but if it looks good, then I'll send a PR and let's get this merged in.

@ianmilligan1
Copy link
Member

Yeah, this is great. Sounds perfect!

@lintool
Copy link
Member Author

lintool commented May 1, 2018

See #214 for PR. The PR includes proper refactoring into Python modules and integration into Maven to create the "deploy" zip.

You'll need to run Maven to build the deploy zip:

mvn clean install -DskipTests

Then start up PySpark as follows with PySpark AUT:

pyspark --driver-class-path target/ --py-files target/aut.zip --jars target/aut-0.16.1-SNAPSHOT-fatjar.jar

Then the following should work:

from aut import *

archive = WebArchive(sc, sqlContext, "warcs/")

pages = archive.pages()
pages.printSchema()

pages.select(extract_domain("Url").alias("Domain")) \
    .groupBy("Domain").count().orderBy("count", ascending=False).show()

links = archive.links()
links.printSchema()

links.select(extract_domain("Src").alias("SrcDomain"), extract_domain("Dest").alias("DestDomain")) \
    .groupBy("SrcDomain", "DestDomain").count().orderBy("count", ascending=False).show()

Note that web_archive has been renamed to WebArchive to conform to Python coding conventions (camel case for classes).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants