Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataFrame discussion: open thread #190

Closed
lintool opened this issue Apr 5, 2018 · 35 comments
Closed

DataFrame discussion: open thread #190

lintool opened this issue Apr 5, 2018 · 35 comments

Comments

@lintool
Copy link
Member

lintool commented Apr 5, 2018

I've been playing a bit with DataFrames. Open thread just to share what the "user experience" might look like. I've got things working in my own local repo, but I'm hiding a bit of the magic here.

Assume we've got a DataFrames created. Here's the schema:

scala> df.printSchema()
root
 |-- CrawlDate: string (nullable = true)
 |-- Url: string (nullable = true)

Shows the rows

scala> df.show()
+---------+--------------------+
|CrawlDate|                 Url|
+---------+--------------------+
| 20080430|http://www.archiv...|
| 20080430|http://www.archiv...|
| 20080430|http://www.archiv...|
| 20080430|http://www.archiv...|
| 20080430|http://www.archiv...|
| 20080430|http://www.archiv...|
| 20080430|http://www.archiv...|
| 20080430|http://www.archiv...|
| 20080430|http://www.archiv...|
| 20080430|http://www.archiv...|
| 20080430|http://www.archiv...|
| 20080430|http://www.archiv...|
| 20080430|http://www.archiv...|
| 20080430|http://www.archiv...|
| 20080430|http://www.archiv...|
| 20080430|http://www.archiv...|
| 20080430|http://www.archiv...|
| 20080430|http://www.archiv...|
| 20080430|http://www.archiv...|
| 20080430|http://www.archiv...|
+---------+--------------------+
only showing top 20 rows

We can extract domains from the urls, and the group by and count as follows:

scala> df.select(Domain($"url").as("Domain")).groupBy("Domain").count().show()
+------------------+-----+
|            Domain|count|
+------------------+-----+
|   www.archive.org|  132|
|www.hideout.com.br|    1|
|     deadlists.com|    2|
+------------------+-----+

Thoughts?

@ruebot
Copy link
Member

ruebot commented Apr 5, 2018

I like it.

@greebie
Copy link
Contributor

greebie commented Apr 5, 2018

I think the main advantage is that it follows (roughly) a typical SQL select statement, which is something that most technical librarians would be comfortable with.

I wonder if parquet format would help us down the road? https://parquet.apache.org/

There are Ruby gems for the format -- might be useful if we get to PetaByte size archives a decade from now.

@lintool
Copy link
Member Author

lintool commented Apr 5, 2018

Or this:

scala> df.printSchema()
root
 |-- src: string (nullable = true)
 |-- dest: string (nullable = true)


scala> df.show()
+--------------------+--------------------+
|                 src|                dest|
+--------------------+--------------------+
|http://www.archiv...|http://www.archiv...|
|http://www.archiv...|http://www.archiv...|
|http://www.archiv...|http://web.archiv...|
|http://www.archiv...|http://www.sloan.org|
|http://www.archiv...|http://www.archiv...|
|http://www.archiv...|http://www.archiv...|
|http://www.archiv...|ftp://ia311502.us...|
|http://www.archiv...|http://ia311502.u...|
|http://www.archiv...|http://www.archiv...|
|http://www.archiv...|http://www.alexa.com|
|http://www.archiv...|http://download.a...|
|http://www.archiv...|http://www.alexa.com|
|http://www.archiv...|http://www.prelin...|
|http://www.archiv...|http://www.prelin...|
|http://www.archiv...|  http://www.nsf.gov|
|http://www.archiv...|  http://www.nsf.gov|
|http://www.archiv...|  http://www.loc.gov|
|http://www.archiv...|  http://www.loc.gov|
|http://www.archiv...|http://www.lizard...|
|http://www.archiv...|http://www.lizard...|
+--------------------+--------------------+
only showing top 20 rows


scala> df.select(Domain($"src").as("src"), Domain($"dest").as("dest")).groupBy("src", "dest").count().show()
+---------------+--------------------+-----+
|            src|                dest|count|
+---------------+--------------------+-----+
|www.archive.org|         www.cfp.org|    1|
|www.archive.org|         www.acm.org|    4|
|www.archive.org|     www.mozilla.org|    1|
|www.archive.org|internetarchive.w...|    2|
|www.archive.org|www.informedia.cs...|    1|
|www.archive.org|    www.smartftp.com|    1|
|  deadlists.com|   www.deadlists.com|    2|
|www.archive.org| www.intermemory.org|    1|
|www.archive.org|     www.nytimes.com|    1|
|www.archive.org|    www.mplayerhq.hu|    1|
|www.archive.org|      www.cygwin.com|    1|
|www.archive.org|     www.archive.org|  305|
|www.archive.org|      lcweb2.loc.gov|    1|
|www.archive.org|        www.oclc.org|    1|
|www.archive.org|    www.mikewren.com|    4|
|www.archive.org|  hotwired.lycos.com|    1|
|www.archive.org|         www.eff.org|    1|
|www.archive.org|         www.ipl.org|    1|
|www.archive.org|     www.privacy.org|    5|
|www.archive.org| www.illiminable.com|    2|
+---------------+--------------------+-----+
only showing top 20 rows

Or, in straight-up SQL if you prefer:

scala> spark.sql("SELECT Domain(src), Domain(dest), count(*) FROM pages GROUP BY Domain(src), Domain(dest)").show()
+---------------+--------------------+--------+
|UDF:Domain(src)|    UDF:Domain(dest)|count(1)|
+---------------+--------------------+--------+
|www.archive.org|         www.cfp.org|       1|
|www.archive.org|         www.acm.org|       4|
|www.archive.org|     www.mozilla.org|       1|
|www.archive.org|internetarchive.w...|       2|
|www.archive.org|www.informedia.cs...|       1|
|www.archive.org|    www.smartftp.com|       1|
|  deadlists.com|   www.deadlists.com|       2|
|www.archive.org| www.intermemory.org|       1|
|www.archive.org|     www.nytimes.com|       1|
|www.archive.org|    www.mplayerhq.hu|       1|
|www.archive.org|      www.cygwin.com|       1|
|www.archive.org|     www.archive.org|     305|
|www.archive.org|      lcweb2.loc.gov|       1|
|www.archive.org|        www.oclc.org|       1|
|www.archive.org|    www.mikewren.com|       4|
|www.archive.org|  hotwired.lycos.com|       1|
|www.archive.org|         www.eff.org|       1|
|www.archive.org|         www.ipl.org|       1|
|www.archive.org|     www.privacy.org|       5|
|www.archive.org| www.illiminable.com|       2|
+---------------+--------------------+--------+
only showing top 20 rows

@greebie
Copy link
Contributor

greebie commented Apr 5, 2018

Yeah -- coding dataframes is definitely more intuitive than mapping through rdds. Lambda's are oddly confusing unless you are used to them.

@cjer
Copy link

cjer commented Apr 5, 2018

Major like on this. Would make it easier for R and Python-pandas minded people to get into Scala code with great ease

@ianmilligan1
Copy link
Member

This is great, @lintool! It would let people interactively explore rather than using our rote scripts.

I don't think we could expect historians to know SQL syntax – I don't know it, for example – but with documentation I think the df.select syntax seems the most out-of-the-box readable. I'd love to see this move forward, and the bonus is I assume it's just added functionality and doesn't disrupt any of our existing scripts?

@greebie
Copy link
Contributor

greebie commented Apr 5, 2018

I think that once we implement scala dataframes, the pressure will be on to get it into pyspark. In particular the .toPandas() method, which would be amazing for mash-ups of Web archives against other social science data. @ianmilligan1 : .toPandas() would mean I could get my correspondence analysis graphs made directly from a collection of warcs, rather than from saved derivatives, albeit with a time lag.

@lintool
Copy link
Member Author

lintool commented Apr 5, 2018

Aye, here's the rub, though... I can imagine three different ways of doing analyses:

  1. current way w/ RDDs + transformations
  2. what I call fluent SQL (e.g., df.select)
  3. actual SQL queries

And, in theory, you multiply those by the number of languages (e.g., Scala, Python, R)

However, it will be a huge pain to make sure all the functionalities are exactly the same - for example, UDFs might be defined a bit differently, so we need to make sure all versions are in sync.

IMO we should standardize on a "canonical way" and then offer people "use at your own risk" functionalities.

@greebie
Copy link
Contributor

greebie commented Apr 5, 2018

Do you think it's possible to call UDFs from Scala via PySpark as per #148? In theory it looks possible, but @MapleOx had challenges getting it to work. Ideally, all pyspark calls would be running Scala UDFs with a small set of helper scripts. Or we could go the direction of using AUT plugins that are "use at your own risk."

@lintool
Copy link
Member Author

lintool commented Apr 5, 2018

For ExtractDomain in the above example, I'm wrapping the existing UDF... but it's kinda a hairball. It'll no doubt be more complicated across languages, and likely cause performance issues.

One concrete proposal would be to deprecate the current RDDs + transformations in favor of fluent SQL. Not right now, but plan for it...

@lintool
Copy link
Member Author

lintool commented Apr 5, 2018

@greebie on the data you just handed me...

scala> df.select(Domain($"Url").as("Domain")).groupBy("Domain").count().orderBy(desc("count")).show()
+--------------------+-----+                                                    
|              Domain|count|
+--------------------+-----+
|notstrategic.blog...|  292|
|       www.uvicfa.ca|   73|
|  uvfawhycertify.org|   70|
|    www.facebook.com|   66|
|notstrategic.blog...|   63|
|www.theglobeandma...|   35|
|     www.blogger.com|   22|
|beta.images.thegl...|   20|
|         www.uvic.ca|   19|
|         twitter.com|   12|
|      www.martlet.ca|    6|
|sec.theglobeandma...|    5|
|      dublincore.org|    5|
|     plus.google.com|    4|
|  mobile.twitter.com|    4|
|          thetyee.ca|    4|
| accounts.google.com|    3|
|     web.adblade.com|    3|
|            purl.org|    2|
|     www.vicnews.com|    2|
+--------------------+-----+
only showing top 20 rows

@greebie
Copy link
Contributor

greebie commented Apr 5, 2018

Beauty! I think the longview should be to move to dataframes more intensely. My little bit of research suggests that dataframes to parquet is where the real big data is at right now. If not parquet, then probably some other column-based/matrix-oriented format. Warcs are definitely not going to get smaller over time -- any way to boost speed and shrink storage will be very important down the road.

However, by "longview" I'm thinking at least 5+ years. Getting historians and non-data scientists engaged is much more important in the short-run, and that's going to require more user-friendly approaches vs super-powerful data formats.

@lintool
Copy link
Member Author

lintool commented Apr 5, 2018

@greebie I'm familiar with Parquet but that's not very relevant for our work... that's a physical format, and it's unlikely that organizations are going to rewrite their TBs of holdings...

@greebie
Copy link
Contributor

greebie commented Apr 5, 2018

I've just been seeing some papers that suggest dataframes --> parquet are the way to go for huge datasets. Also suggestions around that to parquet is better than to text for df outputs. But I see your point.

@lintool
Copy link
Member Author

lintool commented Apr 9, 2018

I've pushed a prototype to branch df.

The following script works:

import io.archivesunleashed._
import io.archivesunleashed.df._

val df = RecordLoader.loadArchives("warcs/", sc)
  .extractValidPagesDF()

df.printSchema()

df.select(ExtractDomain($"Url").as("Domain"))
  .groupBy("Domain").count().orderBy(desc("count")).show()

@ianmilligan1 @greebie give it a try on some more substantial amount of data?

How does it compare to what we're currently doing now?

@ianmilligan1
Copy link
Member

Just ran on a slightly more substantial amount of data (6GB) and looks great. I'll run the same script on a much more substantial corpus now.


+--------------------+------+
|              Domain| count|
+--------------------+------+
|   www.equalvoice.ca|239122|
|       greenparty.ca| 37747|
|      www.liberal.ca| 12543|
|www.policyalterna...| 11096|
|          www.ndp.ca|  7458|
|        www.egale.ca|  6449|
|www.blocquebecois...|  6388|
|     www.fairvote.ca|  6356|
| www.canadiancrc.com|  2478|
| www.davidsuzuki.org|  1991|
|         www.ocap.ca|  1920|
| www.conservative.ca|  1901|
|   www.canadians.org|  1562|
|         www.ccsd.ca|  1388|
|     www.wegovern.ca|   647|
|            ccla.org|   630|
|        www.ccla.org|   471|
|     www.youtube.com|   309|
|          www.gca.ca|   306|
|         coat.ncf.ca|   209|
+--------------------+------+
only showing top 20 rows

@lintool would it be possible to use df to reproduce the plain text (say by domain and by date) and the hyperlinks? I'm just not familiar with the syntax enough to know off hand how to construct that, and you might be able to do so in your sleep.

@lintool
Copy link
Member Author

lintool commented Apr 12, 2018

@ianmilligan1 not yet, I need to port the UDFs over. On my TODO list is to replicate all the standard derivatives using DFs.

@ianmilligan1
Copy link
Member

Cool. This is great, and the df.select syntax seems straightforward and nicely modular to me.

@lintool
Copy link
Member Author

lintool commented Apr 12, 2018

Yes, I believe the df syntax maps nicely over to FAAV. We can probably bring into even better alignment with some custom DSL hacking.

@greebie
Copy link
Contributor

greebie commented Apr 13, 2018

This works nicely. The runspeeds are pretty close, with rdd slightly faster.

I could not import the matchbox and df at the same time due to ambiguity of the ExtractDomain function.

import io.archivesunleashed._
import io.archivesunleashed.matchbox._
import java.time.Instant
val warcPath = "warcs/*.warc.gz"

def timed(f: => Unit) = {
  val start = System.currentTimeMillis()
  f
  val end = System.currentTimeMillis()
  println("Elapsed Time: " + (end - start))
}

timed {
  println("Get urls and count, taking 3.")
  val r = RecordLoader.loadArchives(warcPath, sc)
  .keepValidPages()
  .map (r => ExtractDomain(r.getUrl))
  .countItems()
  println(r.collect().deep.mkString("\n"))
}

// Exiting paste mode, now interpreting.

Get urls and count, taking 3.
(notstrategic.blogspot.ca,292)                                                  
(www.uvicfa.ca,73)
(uvfawhycertify.org,70)
(www.facebook.com,66)
(notstrategic.blogspot.com,63)
(www.theglobeandmail.com,35)
(www.blogger.com,22)
(beta.images.theglobeandmail.com,20)
(www.uvic.ca,19)
(twitter.com,12)
(www.martlet.ca,6)
(dublincore.org,5)
(sec.theglobeandmail.com,5)
(plus.google.com,4)
(thetyee.ca,4)
(mobile.twitter.com,4)
(accounts.google.com,3)
(web.adblade.com,3)
(maps.google.ca,2)
(purl.org,2)
(theglobeandmail.com,2)
(m.theglobeandmail.com,2)
(ow.ly,2)
(pixel.facebook.com,2)
(www.vicnews.com,2)
(d1z2jf7jlzjs58.cloudfront.net,2)
(sec.images.theglobeandmail.com,2)
(www.google.com,1)
(janniaragon.wordpress.com,1)
(googleads.g.doubleclick.net,1)
(martlet.ca,1)
(v1.theglobeandmail.com,1)
(btn.weather.ca,1)
(d.adgear.com,1)
(mail.google.com,1)
(www.cfax1070.com,1)
(globeandmail.com,1)
(cdns.gigya.com,1)
(www.blogblog.com,1)
(static.ak.facebook.com,1)
(janniaragon.me,1)
(player.cfax1070.com,1)
(platform.twitter.com,1)
(www.sfufa.ca,1)
(www.cautbulletin.ca,1)
(fonts.googleapis.com,1)
(gmpg.org,1)
(ype.youneeq.ca,1)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>Elapsed Time: 4310
import io.archivesunleashed._
import io.archivesunleashed.matchbox._
import java.time.Instant
warcPath: String = warcs/*.warc.gz
timed: (f: => Unit)Unit

scala> :paste
// Entering paste mode (ctrl-D to finish)

import io.archivesunleashed._
import io.archivesunleashed.df._
import java.time.Instant

val warcPath = "warcs/*.warc.gz"

def timed(f: => Unit) = {
  val start = System.currentTimeMillis()
  f
  val end = System.currentTimeMillis()
  println("Elapsed Time: " + (end - start))
}

timed {
  println ("Testing dataframes");
  val df = RecordLoader.loadArchives(warcPath, sc)
    .extractValidPagesDF()
  df.select(ExtractDomain($"Url")
    .as("Domain")).groupBy("Domain")
    .count().orderBy(desc("count")).show()  
}

// Exiting paste mode, now interpreting.

Testing dataframes
+--------------------+-----+                                                    
|              Domain|count|
+--------------------+-----+
|notstrategic.blog...|  292|
|       www.uvicfa.ca|   73|
|  uvfawhycertify.org|   70|
|    www.facebook.com|   66|
|notstrategic.blog...|   63|
|www.theglobeandma...|   35|
|     www.blogger.com|   22|
|beta.images.thegl...|   20|
|         www.uvic.ca|   19|
|         twitter.com|   12|
|      www.martlet.ca|    6|
|sec.theglobeandma...|    5|
|      dublincore.org|    5|
|          thetyee.ca|    4|
|  mobile.twitter.com|    4|
|     plus.google.com|    4|
| accounts.google.com|    3|
|     web.adblade.com|    3|
|d1z2jf7jlzjs58.cl...|    2|
|      maps.google.ca|    2|
+--------------------+-----+
only showing top 20 rows

>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>Elapsed Time: 4865
import io.archivesunleashed._
import io.archivesunleashed.df._
import java.time.Instant
warcPath: String = warcs/*.warc.gz
timed: (f: => Unit)Unit

@helgeho
Copy link
Contributor

helgeho commented Apr 17, 2018

This data frame discussion is great 👍 But here is just a quick note / idea on that:

When you start from an ArchiveSpark RDD or you convert an AUT RDD using the bridge with .toArchiveSpark, you can apply our enrich functions, like HTML, Entities and others, e.g., by calling rdd.enrich(Html.first("title")), depending on what data you need, and finally, convert it into a JSON format with rdd.toJsonStrings(pretty = false). The produced JSON is fully compatible with Spark's data frames, so you can directly read it with .read.json and query the contained fields.

Maybe it would be worth having a look at that and build your new UDFs on top of it instead of reinventing the wheel and write your own conversions. This way we could reuse the idea of enrichments and help it grow + support data frames with UDFs to run queries on top of that.

Further, the pre-processed JSON dataset can also be saved and directly reloaded as a data frame, so ArchiveSpark can be used for corpus building here, and a reloaded corpus would be directly supported by your work on data frames. We haven't spent much work on data frames specifically yet, but it turns out this is a very easy way to integrate both approaches, without much additional work.

@lintool
Copy link
Member Author

lintool commented Apr 18, 2018

Updated df branch. Now this works:

import io.archivesunleashed._
import io.archivesunleashed.df._

val df = RecordLoader.loadArchives("warcs/", sc)
  .extractHyperlinksDF()

df.printSchema()

df.select(RemovePrefixWWW(ExtractDomain($"Src")).as("SrcDomain"),
    RemovePrefixWWW(ExtractDomain($"Dest")).as("DestDomain"))
  .groupBy("SrcDomain", "DestDomain").count().orderBy(desc("SrcDomain")).show()

Result is something like this:

+---------------+--------------------+-----+                                    
|      SrcDomain|          DestDomain|count|
+---------------+--------------------+-----+
|web.adblade.com|     web.adblade.com|   18|
|web.adblade.com|smarterlifestyles...|    8|
|    vicnews.com|iservices.blackpr...|    2|
|    vicnews.com|albernivalleynews...|    2|
|    vicnews.com|       blackpress.ca|   14|
|    vicnews.com|         twitter.com|    6|
|    vicnews.com|       revweekly.com|    4|
|    vicnews.com|         vicnews.com|   35|
|    vicnews.com|       drivewaybc.ca|    4|
|    vicnews.com|         addthis.com|    2|
|    vicnews.com|placead.bcclassif...|    4|
|    vicnews.com|blogs.bclocalnews...|    4|
|    vicnews.com|       pinterest.com|    2|
|    vicnews.com|       mondaymag.com|    2|
|    vicnews.com|          paperg.com|    2|
|    vicnews.com| nanaimobulletin.com|    2|
|    vicnews.com|         pqbnews.com|    3|
|    vicnews.com| sookenewsmirror.com|    1|
|    vicnews.com|     bclocalnews.com|    8|
|    vicnews.com|        facebook.com|    4|
+---------------+--------------------+-----+
only showing top 20 rows

@ianmilligan1
Copy link
Member

ianmilligan1 commented Apr 19, 2018

Late to this, but this is working quite nicely @lintool – here they are on a decently-large collection sorted by descending count.

+--------------------+--------------------+---------+
|           SrcDomain|          DestDomain|    count|
+--------------------+--------------------+---------+
|          liberal.ca|          liberal.ca|112375610|
|              ndp.ca|              ndp.ca| 27593036|
|policyalternative...|policyalternative...| 20066112|
|     davidsuzuki.org|     davidsuzuki.org| 17909044|
|              chp.ca|              chp.ca| 15486062|
|          liberal.ca|   action.liberal.ca| 14598706|
|            ccla.org|            ccla.org| 13258763|
|       equalvoice.ca|       equalvoice.ca| 10245075|
|canadianactionpar...|canadianactionpar...|  9896440|
|              afn.ca|              afn.ca|  8109879|
|       greenparty.ca|       greenparty.ca|  3402972|
|     canadiancrc.com|     canadiancrc.com|  2937523|
|       canadians.org|       canadians.org|  2750689|
|       equalvoice.ca|gettingtothegate.com|  2464993|
|          liberal.ca|         twitter.com|  2377754|
|       equalvoice.ca|           adobe.com|  2368464|
|          liberal.ca|   events.liberal.ca|  2234996|
|       equalvoice.ca|       snapdesign.ca|  2103383|
|       equalvoice.ca|          flickr.com|  2067749|
|       equalvoice.ca|         twitter.com|  2066463|
+--------------------+--------------------+---------+

What's the best way to export these to CSV files?

@greebie
Copy link
Contributor

greebie commented Apr 26, 2018

I was reading this http://sigdelta.com/blog/scala-spark-udfs-in-python/ and it looks like a good path forward. Create .callUdf() and .registerUdf() functions for the objects in aut and then they can be used in Python or Scala.

Then the main Python code can just be a bridge to aut which would reduce the long-term maintenance burden.

@SamFritz
Copy link
Member

RD Lightning Talk + Discussion

We had a great discussion at the Toronto Datathon about data frames. Just some quick notes made during @lintool lightning talk and feedback from the community:

The AUT team would like the community’s input in regards to data frames. Just for a bit of context, the discussion revolves around whether there is interest in moving from RDDs to data frames/table framework. We would like to gauge interest as well as what this would mean for accessibility, impact, uptake, etc.

Community input will help to direct future project priorities and determine dev cycles. Ultimately we want to focus future work on features/items that will be useful and used.

Discussion/Points brought up by AUT Datathon Participants:

  • The question thrown out to the group: Should we treat AUT like a big SQL?
  • Moving to DF/SQL/Python would be appreciated, for example, this would mean that people could work with the dev teams without having to learn a completely different language.
  • Several participants agreed that working with Python would be ideal.
  • DF is easier to read visually
  • Ultimately: there is enthusiasm for moving towards SQL + Python data frames

Other points to consider:

  • Moving over to DF means deprecating RDD - we (AUT) won’t be able to create and support multiple iterations in parallel as a long-term solution
  • Big Win = into Pandas - then connector to Python (but there may be limitations with working with Pandas)
  • Even a raw python output would be great
  • Need to consider any limitation of DF over RDD

Further discussion from datathon re: DF to be documented

ruebot pushed a commit that referenced this issue Apr 27, 2018
* Initial stab at df.
* Initial stab of what link extraction would look like with DFs.
* Added test case.
* Docs.
@ruebot
Copy link
Member

ruebot commented May 2, 2018

#214 has been merged now. #209 has some examples on how to use PySpark. If you use Spark 2.3.0, it doesn't require the .enableHiveSupport() hack.

FYI: @digitalshawn, @justinlittman

@jrwiebe
Copy link
Contributor

jrwiebe commented Feb 1, 2019

What is to be the relationship between the class DataFrameLoader and the DataFrame UDFs? As we add UDFs (e.g., ExtractPDFLinks) should we also add methods to DataFrameLoader that resemble the existing ones – i.e., methods that take an archive path argument, which are wrappers around RecordLoader.loadArchives()?

@ianmilligan1
Copy link
Member

^^^ @lintool ?

@ruebot
Copy link
Member

ruebot commented Aug 17, 2019

@jrwiebe do you still need input from @lintool on this one?

@jrwiebe
Copy link
Contributor

jrwiebe commented Aug 17, 2019

His input would be valuable. It gets to some of the point @SamFritz summarizes above: do we go all-in on DF, and deprecate RDD? Is anything important lost in doing so?

@ruebot
Copy link
Member

ruebot commented Aug 17, 2019

I think it would just be hiding RDD from the user, and basically finishing migrating everything over. @lintool has a bit more in #231

...I'm still waiting feedback from him on #223 too

@jrwiebe
Copy link
Contributor

jrwiebe commented Aug 18, 2019

Couldn't stay away.

I was looking at the "resolve before 0.18.0" tag, and noticed @lintool's comment about how filtering, in the case of a certain issue, is being done on RDD, when our plan is eventually to move everything to DF. After my contributions of the past week I thought I'd see how much work would be needed to use DataFrames from the initial loadArchives call.

I created a branch that does this (54af833), and re-implements extractPDFDetailsDF using DataFrames end-to-end; the new method is extractPDFDetailsDF2. The use of _UDF methods looks clunky, but will seem less so if we convert most of our matchbox methods to UDF methods that operate on DF columns, as these do. Also, it's quite possible I've gone about things in the wrong way.

Demo

scala> :paste
// Entering paste mode (ctrl-D to finish)

import io.archivesunleashed._
import io.archivesunleashed.df._

val df = RecordLoader.loadArchivesDF("target/test-classes/warc/example.pdf.warc.gz", sc)

// Exiting paste mode, now interpreting.

import io.archivesunleashed._
import io.archivesunleashed.df._
df: org.apache.spark.sql.DataFrame = [archiveFilename: string, crawlDate: string ... 8 more fields]

scala> df.select("url", "mimeType").show(10)
+--------------------+--------------------+
|                 url|            mimeType|
+--------------------+--------------------+
|https://ajax.goog...|     text/javascript|
|https://yorkspace...|            text/css|
|https://yorkspace...|            text/css|
|https://yorkspace...|application/javas...|
|https://d39af2mgp...|application/javas...|
|https://yorkspace...|            text/css|
|https://yorkspace...|            text/css|
|https://yorkspace...|            text/css|
|https://yorkspace...|            text/css|
|http://d39af2mgp1...|            text/css|
+--------------------+--------------------+
only showing top 10 rows

scala> val pdfDeets = df.extractPDFDetailsDF2
19/08/18 16:41:41 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
pdfDeets: org.apache.spark.sql.DataFrame = [url: string, filename: string ... 5 more fields]

scala> pdfDeets.show
19/08/18 16:41:49 WARN PDFParser: J2KImageReader not loaded. JPEG2000 files will not be processed.
See https://pdfbox.apache.org/2.0/dependencies.html#jai-image-io
for optional dependencies.

[Stage 2:>                                                          (0 + 1) / 1]19/08/18 16:41:49 WARN SQLite3Parser: org.xerial's sqlite-jdbc is not loaded.
Please provide the jar on your classpath to parse sqlite files.
See tika-parsers/pom.xml for the correct version.
+--------------------+--------------------+---------+-------------------+---------------+--------------------+--------------------+
|                 url|            filename|extension|mime_type_webserver| mime_type_tika|                 md5|         binaryBytes|
+--------------------+--------------------+---------+-------------------+---------------+--------------------+--------------------+
|https://yorkspace...|   cost-analysis.pdf|      pdf|    application/pdf|application/pdf|aaba59d2287afd40c...|[25 50 44 46 2D 3...|
|https://yorkspace...|JCDL%20-%20Cost%2...|      pdf|    application/pdf|application/pdf|322cd5239141408c4...|[25 50 44 46 2D 3...|
+--------------------+--------------------+---------+-------------------+---------------+--------------------+--------------------+

@ruebot
Copy link
Member

ruebot commented Aug 19, 2019

Nice work!

My main goal here is to get @lintool's input before the 0.18.0 release, but not necessarily implement it all before then. I'm really looking the correct path forward here, and on #231, which overlaps a fair bit. That way we have a focused path post 0.18.0 release. If we have this branch, I think that is a really good start, and hopefully will align with @lintool's thinking.

@jrwiebe
Copy link
Contributor

jrwiebe commented Aug 19, 2019

That makes sense.

It turns out my loadArchivesDF is not ready for prime time. I ran the PDF extraction method against the data I was using to test #348, 14g of CPP WARCs, and it failed part way through with an out of memory error. I ran an even simpler script that applies a filter to a DF of WARCs, and that failed too (log).

import io.archivesunleashed._
import io.archivesunleashed.df._

val warcs_path = "/home/jrwiebe/warcs/cpp10/*.gz"
val output_path = "/tuna1/scratch/jrwiebe/get-pdf-test/df/"

val df = RecordLoader.loadArchivesDF(warcs_path, sc)

val count_www = df.filter(col("url").contains("www")).count

println(count_www + " URLs contain 'www'")

sys.exit

It seems that the VM's array size limit is reached when converting certain RDD records -- presumably ones containing large files -- to DataFrame rows. Searching for the problem I see recommendations about tweaking spark settings, such as parallelism and number of partitions, but I think we're actually limited by the number of WARC files we're dealing with, so I don't know that this would help. Anyway, it's something to start with.

ruebot added a commit that referenced this issue Aug 20, 2019
- Address #190
- Address #259
- Address #302
- Address #303
- Address #304
- Address #305
- Address #306
- Address #307
@lintool
Copy link
Member Author

lintool commented Aug 21, 2019

We're good to close this issue. Re: large blobs overflowing memory, we should create a new issue when it really becomes a bottleneck.

ianmilligan1 pushed a commit that referenced this issue Aug 21, 2019
* Add binary extration DataFrames to PySpark.
- Address #190
- Address #259
- Address #302
- Address #303
- Address #304
- Address #305
- Address #306
- Address #307
- Resolves #350 
- Update README
@ruebot ruebot closed this as completed Nov 14, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

9 participants