Skip to content

Commit

Permalink
Implement Scala Matchbox UDFs in Python. (#463)
Browse files Browse the repository at this point in the history
- Resolves #408
- Alphabetizes DataFrameloader functions
- Alphabetizes UDFs functions
- Move DataFrameLoader to df packages
- Move UDFs out of df into their own package
- Rename UDFs (no more DF tagged to the end).
- Update tests as necessary
- Partially addresses #410, #409
- Supersedes #412.
  • Loading branch information
ruebot authored May 19, 2020
1 parent 1d01571 commit 69007e2
Show file tree
Hide file tree
Showing 19 changed files with 301 additions and 188 deletions.
4 changes: 2 additions & 2 deletions src/main/python/aut/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from aut.common import WebArchive
from aut.udfs import extract_domain
from aut.udfs import Udf

__all__ = ["WebArchive", "extract_domain"]
__all__ = ["WebArchive", "Udf"]
36 changes: 19 additions & 17 deletions src/main/python/aut/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,41 +5,43 @@ class WebArchive:
def __init__(self, sc, sqlContext, path):
self.sc = sc
self.sqlContext = sqlContext
self.loader = sc._jvm.io.archivesunleashed.DataFrameLoader(sc._jsc.sc())
self.loader = sc._jvm.io.archivesunleashed.df.DataFrameLoader(sc._jsc.sc())
self.path = path

def all(self):
return DataFrame(self.loader.all(self.path), self.sqlContext)

def webpages(self):
return DataFrame(self.loader.webpages(self.path), self.sqlContext)
def audio(self):
return DataFrame(self.loader.audio(self.path), self.sqlContext)

def links(self):
return DataFrame(self.loader.webgraph(self.path), self.sqlContext)
def image_graph(self):
return DataFrame(self.loader.imagegraph(self.path), self.sqlContext)

def images(self):
return DataFrame(self.loader.images(self.path), self.sqlContext)

def image_links(self):
return DataFrame(self.loader.imagegraph(self.path), self.sqlContext)

def pdfs(self):
return DataFrame(self.loader.pdfs(self.path), self.sqlContext)

def audio(self):
return DataFrame(self.loader.audio(self.path), self.sqlContext)
def presentation_program(self):
return DataFrame(
self.loader.presentationProgramFiles(self.path), self.sqlContext
)

def spreadsheets(self):
return DataFrame(self.loader.spreadsheets(self.path), self.sqlContext)

def text_files(self):
return DataFrame(self.loader.textFiles(self.path), self.sqlContext)

def video(self):
return DataFrame(self.loader.videos(self.path), self.sqlContext)

def spreadsheets(self):
return DataFrame(self.loader.spreadsheets(self.path), self.sqlContext)
def webgraph(self):
return DataFrame(self.loader.webgraph(self.path), self.sqlContext)

def presentation_program(self):
return DataFrame(self.loader.presentationProgramFiles(self.path), self.sqlContext)
def webpages(self):
return DataFrame(self.loader.webpages(self.path), self.sqlContext)

def word_processor(self):
return DataFrame(self.loader.wordProcessorFiles(self.path), self.sqlContext)

def text_files(self):
return DataFrame(self.loader.textFiles(self.path), self.sqlContext)
120 changes: 111 additions & 9 deletions src/main/python/aut/udfs.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,115 @@
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark import SparkContext
from pyspark.sql.column import Column, _to_java_column, _to_seq
from pyspark.sql.functions import col


def extract_domain_func(url):
url = url.replace("http://", "").replace("https://", "")
if "/" in url:
return url.split("/")[0].replace("www.", "")
else:
return url.replace("www.", "")
class Udf:
def compute_image_size(col):
sc = SparkContext.getOrCreate()
udf = (
sc.getOrCreate()
._jvm.io.archivesunleashed.udfs.package.computeImageSize()
.apply
)
return Column(udf(_to_seq(sc, [col], _to_java_column)))

def compute_md5(col):
sc = SparkContext.getOrCreate()
udf = sc.getOrCreate()._jvm.io.archivesunleashed.udfs.package.computeMD5().apply
return Column(udf(_to_seq(sc, [col], _to_java_column)))

extract_domain = udf(extract_domain_func, StringType())
def compute_sha1(col):
sc = SparkContext.getOrCreate()
udf = (
sc.getOrCreate()._jvm.io.archivesunleashed.udfs.package.computeSHA1().apply
)
return Column(udf(_to_seq(sc, [col], _to_java_column)))

def detect_language(col):
sc = SparkContext.getOrCreate()
udf = (
sc.getOrCreate()
._jvm.io.archivesunleashed.udfs.package.detectLanguage()
.apply
)
return Column(udf(_to_seq(sc, [col], _to_java_column)))

def detect_mime_type_tika(col):
sc = SparkContext.getOrCreate()
udf = (
sc.getOrCreate()
._jvm.io.archivesunleashed.udfs.package.detectMimeTypeTika()
.apply
)
return Column(udf(_to_seq(sc, [col], _to_java_column)))

def extract_boilerplate(col):
sc = SparkContext.getOrCreate()
udf = (
sc.getOrCreate()
._jvm.io.archivesunleashed.udfs.package.extractBoilerpipeText()
.apply
)
return Column(udf(_to_seq(sc, [col], _to_java_column)))

def extract_date(col, dates):
sc = SparkContext.getOrCreate()
udf = (
sc.getOrCreate()._jvm.io.archivesunleashed.udfs.package.extractDate().apply
)
return Column(udf(_to_seq(sc, [col], _to_java_column)))

def extract_domain(col):
sc = SparkContext.getOrCreate()
udf = (
sc.getOrCreate()
._jvm.io.archivesunleashed.udfs.package.extractDomain()
.apply
)
return Column(udf(_to_seq(sc, [col], _to_java_column)))

def extract_image_links(col, image_links):
sc = SparkContext.getOrCreate()
udf = (
sc.getOrCreate()
._jvm.io.archivesunleashed.udfs.package.extractImageLinks()
.apply
)
return Column(udf(_to_seq(sc, [col, image_links], _to_java_column)))

def extract_links(col, links):
sc = SparkContext.getOrCreate()
udf = (
sc.getOrCreate()._jvm.io.archivesunleashed.udfs.package.extractLinks().apply
)
return Column(udf(_to_seq(sc, [col, links], _to_java_column)))

def get_extension_mime(col, mime):
sc = SparkContext.getOrCreate()
udf = (
sc.getOrCreate()._jvm.io.archivesunleashed.udfs.package.getExtensionMime().apply
)
return Column(udf(_to_seq(sc, [col, mime], _to_java_column)))

def remove_http_header(col):
sc = SparkContext.getOrCreate()
udf = (
sc.getOrCreate()
._jvm.io.archivesunleashed.udfs.package.removeHTTPHeader()
.apply
)
return Column(udf(_to_seq(sc, [col], _to_java_column)))

def remove_html(col):
sc = SparkContext.getOrCreate()
udf = sc.getOrCreate()._jvm.io.archivesunleashed.udfs.package.removeHTML().apply
return Column(udf(_to_seq(sc, [col], _to_java_column)))

def remove_prefix_www(col):
sc = SparkContext.getOrCreate()
udf = (
sc.getOrCreate()
._jvm.io.archivesunleashed.udfs.package.removePrefixWWW()
.apply
)
return Column(udf(_to_seq(sc, [col], _to_java_column)))
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@

package io.archivesunleashed.app

import io.archivesunleashed.df.{ExtractDomainDF, RemovePrefixWWWDF}
import io.archivesunleashed.{ArchiveRecord, DataFrameLoader}
import io.archivesunleashed.ArchiveRecord
import io.archivesunleashed.df.DataFrameLoader
import io.archivesunleashed.udfs.{extractDomain, removePrefixWWW}
import org.apache.spark.sql.functions.desc
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

Expand All @@ -32,7 +33,7 @@ object DomainFrequencyExtractor {
// scalastyle:off
import spark.implicits._
// scalastyle:on
d.groupBy(RemovePrefixWWWDF(ExtractDomainDF($"url")).as("domain"))
d.groupBy(removePrefixWWW(extractDomain($"url")).as("domain"))
.count()
.sort($"count".desc)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@

package io.archivesunleashed.app

import io.archivesunleashed.df.{ExtractDomainDF, RemovePrefixWWWDF}
import io.archivesunleashed.{ArchiveRecord, DataFrameLoader}
import io.archivesunleashed.ArchiveRecord
import io.archivesunleashed.df.DataFrameLoader
import io.archivesunleashed.udfs.{extractDomain, removePrefixWWW}
import org.apache.spark.sql.functions.desc
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

Expand All @@ -34,8 +35,8 @@ object DomainGraphExtractor {
// scalastyle:on
d.groupBy(
$"crawl_date",
RemovePrefixWWWDF(ExtractDomainDF($"src")).as("src_domain"),
RemovePrefixWWWDF(ExtractDomainDF($"dest")).as("dest_domain"))
removePrefixWWW(extractDomain($"src")).as("src_domain"),
removePrefixWWW(extractDomain($"dest")).as("dest_domain"))
.count()
.filter(!($"dest_domain"===""))
.filter(!($"src_domain"===""))
Expand Down
24 changes: 12 additions & 12 deletions src/main/scala/io/archivesunleashed/app/ExtractImageDetailsDF.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
*/
package io.archivesunleashed.app

import io.archivesunleashed.{DataFrameLoader}
import org.apache.commons.io.FilenameUtils
import io.archivesunleashed.df.{ComputeImageSizeDF, ComputeMD5DF, ComputeSHA1DF,
GetExtensionMimeDF}
import org.apache.hadoop.fs.{Path}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.functions.udf
import io.archivesunleashed.df.DataFrameLoader
import io.archivesunleashed.udfs.{computeImageSize, computeMD5, computeSHA1,
getExtensionMime}
import java.net.URL
import java.util.Base64
import org.apache.commons.io.FilenameUtils
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

/** Extracts image details given raw bytes. */
object ExtractImageDetailsDF {
Expand All @@ -50,13 +50,13 @@ object ExtractImageDetailsDF {
d.select($"crawl_date",
$"url",
filename($"url").as("filename"),
GetExtensionMimeDF(urlClass($"url") ,$"mime_type_tika").as("extension"),
getExtensionMime(urlClass($"url") ,$"mime_type_tika").as("extension"),
$"mime_type_web_server",
$"mime_type_tika",
width(ComputeImageSizeDF($"bytes")).as("width"),
height(ComputeImageSizeDF($"bytes")).as("height"),
ComputeMD5DF($"bytes").as("MD5"),
ComputeSHA1DF($"bytes").as("SHA1"),
width(computeImageSize($"bytes")).as("width"),
height(computeImageSize($"bytes")).as("height"),
computeMD5($"bytes").as("MD5"),
computeSHA1($"bytes").as("SHA1"),
body($"bytes").as("body"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@

package io.archivesunleashed.app

import io.archivesunleashed.{ArchiveRecord, DataFrameLoader}
import io.archivesunleashed.ArchiveRecord
import io.archivesunleashed.df.DataFrameLoader
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object ImageGraphExtractor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package io.archivesunleashed.app

import io.archivesunleashed.ArchiveRecord
import io.archivesunleashed.df.{ExtractBoilerpipeTextDF}
import io.archivesunleashed.udfs.{extractBoilerpipeText}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object PlainTextExtractor {
Expand All @@ -31,6 +31,6 @@ object PlainTextExtractor {
// scalastyle:off
import spark.implicits._
// scalastyle:on
d.select(ExtractBoilerpipeTextDF($"content").as("content"))
d.select(extractBoilerpipeText($"content").as("content"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@

package io.archivesunleashed.app

import io.archivesunleashed.{ArchiveRecord, DataFrameLoader}
import io.archivesunleashed.ArchiveRecord
import io.archivesunleashed.df.DataFrameLoader
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object WebGraphExtractor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package io.archivesunleashed.app

import io.archivesunleashed.ArchiveRecord
import io.archivesunleashed.df.{ExtractDomainDF, RemoveHTMLDF,
RemoveHTTPHeaderDF, RemovePrefixWWWDF}
import io.archivesunleashed.udfs.{extractDomain, removeHTML,
removeHTTPHeader, removePrefixWWW}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object WebPagesExtractor {
Expand All @@ -34,11 +34,11 @@ object WebPagesExtractor {
import spark.implicits._
// scalastyle:on
d.select($"crawl_date",
RemovePrefixWWWDF(ExtractDomainDF($"url")).as("domain"),
removePrefixWWW(extractDomain($"url")).as("domain"),
$"url",
$"mime_type_web_server",
$"mime_type_tika",
$"language",
RemoveHTMLDF(RemoveHTTPHeaderDF(($"content"))).alias("content"))
removeHTML(removeHTTPHeader(($"content"))).alias("content"))
}
}
Loading

0 comments on commit 69007e2

Please sign in to comment.