Skip to content

Commit

Permalink
Add ExtractGraphX including algorithms for PageRank and Components. I…
Browse files Browse the repository at this point in the history
…ssue 203 (#245)

* pom.xml change for GraphX
* Changes for GraphXSLS
* Changes for SLS graph
* Changes for GraphX
* Changes for converting WARC RDD to GraphX object
* Rename extractor to ExtractGraphX
* Various lint fixes (usually Magic Numbers)
* Remove illegal imports from scala style (we use wildcard imports a lot)
* Add WriteGraphXMLTest.
  • Loading branch information
greebie authored and ianmilligan1 committed Jul 29, 2018
1 parent 290b6aa commit afe9254
Show file tree
Hide file tree
Showing 16 changed files with 421 additions and 38 deletions.
4 changes: 2 additions & 2 deletions src/main/scala/io/archivesunleashed/ArchiveRecord.scala
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,14 @@ class ArchiveRecordImpl(r: SerializableWritable[ArchiveRecordWritable]) extends
new String(getContentBytes)
}

val getMimeType = {
val getMimeType: String = {
if (r.t.getFormat == ArchiveRecordWritable.ArchiveFormat.ARC)
arcRecord.getMetaData.getMimetype
else
WarcRecordUtils.getWarcResponseMimeType(getContentBytes)
}

val getUrl = {
val getUrl: String = {
if (r.t.getFormat == ArchiveRecordWritable.ArchiveFormat.ARC)
arcRecord.getMetaData.getUrl
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package io.archivesunleashed.app

import io.archivesunleashed._
import io.archivesunleashed.{ArchiveRecord, DataFrameLoader, CountableRDD}
import io.archivesunleashed.matchbox
import io.archivesunleashed.df
import org.apache.spark.rdd.RDD
Expand All @@ -30,7 +30,7 @@ object DomainFrequencyExtractor {
* @param records RDD[ArchiveRecord] obtained from RecordLoader
* @return RDD[(String,Int))], which holds (DomainName, DomainFrequency)
*/
def apply(records: RDD[ArchiveRecord]) = {
def apply(records: RDD[ArchiveRecord]): RDD[(String, Int)] = {
records
.keepValidPages()
.map(r => matchbox.ExtractDomain(r.getUrl))
Expand All @@ -44,7 +44,9 @@ object DomainFrequencyExtractor {
*/
def apply(d: DataFrame): Dataset[Row] = {
val spark = SparkSession.builder().master("local").getOrCreate()
import spark.implicits._
// scalastyle:off
import spark.implicits._
// scalastyle:on

d.select(df.ExtractBaseDomain($"Url").as("Domain"))
.groupBy("Domain").count().orderBy(desc("count"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package io.archivesunleashed.app

import io.archivesunleashed._
import io.archivesunleashed.{ArchiveRecord, DataFrameLoader, CountableRDD}
import io.archivesunleashed.matchbox.{ExtractDomain, ExtractLinks}
import io.archivesunleashed.df
import org.apache.spark.rdd.RDD
Expand All @@ -31,7 +31,7 @@ object DomainGraphExtractor {
* @return RDD[(String, String, String), Int],
* which holds ((CrawlDate, SourceDomain, DestinationDomain), Frequency)
*/
def apply(records: RDD[ArchiveRecord]) = {
def apply(records: RDD[ArchiveRecord]): RDD[((String, String, String), Int)] = {
records
.keepValidPages()
.map(r => (r.getCrawlDate, ExtractLinks(r.getUrl, r.getContentString)))
Expand All @@ -52,8 +52,9 @@ object DomainGraphExtractor {
*/
def apply(d: DataFrame): Dataset[Row] = {
val spark = SparkSession.builder().master("local").getOrCreate()
// scalastyle:off
import spark.implicits._

// scalastyle:on
d.select($"CrawlDate",
df.RemovePrefixWWW(df.ExtractBaseDomain($"Src")).as("SrcDomain"),
df.RemovePrefixWWW(df.ExtractBaseDomain($"Dest")).as("DestDomain"))
Expand Down
11 changes: 7 additions & 4 deletions src/main/scala/io/archivesunleashed/app/ExtractGraph.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@
*/
package io.archivesunleashed.app

import io.archivesunleashed._
import io.archivesunleashed.matchbox._
import io.archivesunleashed.ArchiveRecord
import io.archivesunleashed.matchbox.{ExtractLinks, ExtractDomain, WWWLink}
import io.archivesunleashed.util.JsonUtils
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

/** Extracts a network graph using Spark's GraphX utility. */
@deprecated("Use ExtractGraphX instead.", "0.16.1")
object ExtractGraph {
val TOLERANCE: Double = 0.005
val NUM_ITER: Int = 20

/** Creates a hashcode from a url to use as a unique id.
*
Expand All @@ -48,7 +51,7 @@ object ExtractGraph {
* @return a Graph object containing data for vertices and edges as extracted.
*/
def apply(records: RDD[ArchiveRecord], dynamic: Boolean = false,
tolerance: Double = 0.005, numIter: Int = 20): Graph[VertexData, EdgeData] = {
tolerance: Double = TOLERANCE, numIter: Int = NUM_ITER): Graph[VertexData, EdgeData] = {
val extractedLinks = records.keepValidPages()
.map(r => (r.getCrawlDate, ExtractLinks(r.getUrl, r.getContentString)))
.flatMap(r => r._2.map(f => (r._1, ExtractDomain(f._1).removePrefixWWW(), ExtractDomain(f._2).removePrefixWWW())))
Expand Down Expand Up @@ -94,7 +97,7 @@ object ExtractGraph {
* @param edgesPath Filepath for edges output
* @return Unit().
*/
def writeAsJson(verticesPath: String, edgesPath: String) = {
def writeAsJson(verticesPath: String, edgesPath: String): Unit = {
// Combine edges of a given (date, src, dst) combination into single record with count value.
val edgesCounted = graph.edges.countItems().map {
r => Map("date" -> r._1.attr.date,
Expand Down
95 changes: 95 additions & 0 deletions src/main/scala/io/archivesunleashed/app/ExtractGraphX.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Archives Unleashed Toolkit (AUT):
* An open-source platform for analyzing web archives.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.archivesunleashed.app

import io.archivesunleashed._
import io.archivesunleashed.matchbox._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

/** Extracts a site link structure using Spark's GraphX utility. */
object ExtractGraphX {

/** Creates a hashcode from a url to use as a unique id.
*
* @param url
* @return unique id as long integer.
*/
def pageHash(url: String): VertexId = {
url.hashCode.toLong
}
val DEF_TOLERANCE = 0.005
val DEF_NUMITER = 20
val DEF_RESET = 0.15
/* tolerance level (probability of following a link) for page rank */
var tolerance: Double = DEF_TOLERANCE
/* number of iterations for page rank and strongly connected components */
var numIter: Int = DEF_NUMITER
/* probability page rank "walk" will start from a random position */
var resetProbability: Double = DEF_RESET
/* whether to calculate dynamic page rank */
var dynamic: Boolean = false
/* default page rank value */
val defaultPR: Double = 0.0
val defaultComponent: Long = 0
case class VertexData(url: String)
case class EdgeData(edgeCount: Int)
case class VertexDataPR(url: String, pageRank: Double, weak: Long, strong: Long)

/** Creates a GraphX object
*
* @param records an RDD of tuples (source, destination)
* @return a GraphX object
*/
def extractGraphX(records: RDD[(String, String)]): Graph[VertexData, EdgeData] = {
val extractedLinks = records.persist()
val vertices: RDD[(VertexId, VertexData)] = extractedLinks
.flatMap(r => List(r._1, r._2))
.distinct
.map(r => (pageHash(r), VertexData(r)))
val edges: RDD[Edge[EdgeData]] = extractedLinks
.map(r => Edge(pageHash(r._1), pageHash(r._2), EdgeData(1)))
val graph = Graph(vertices, edges)
.partitionBy(PartitionStrategy.RandomVertexCut)
.groupEdges((e1,e2) => EdgeData(e1.edgeCount + e2.edgeCount))
graph
}

/** calculates basic graph data (Page Rank, weak and strong components) for Graph
*
* @param graph GraphX object
* @return new graph object with additional attributes
*/
def runPageRankAlgorithm(graph: Graph[VertexData, EdgeData]): Graph[VertexDataPR, EdgeData] = {
if(dynamic){
graph.outerJoinVertices(graph.pageRank(numIter, resetProbability).vertices) {
case (id, vd, pr) => (vd, pr)}
.outerJoinVertices(graph.connectedComponents().vertices) {
case (id, (vd, pr), cc) => (vd, pr, cc)}
.outerJoinVertices(graph.stronglyConnectedComponents(numIter).vertices) {
case (id, (vd, pr, cc), scc) => VertexDataPR(vd.url, pr.getOrElse(defaultPR), cc.getOrElse(defaultComponent), scc.getOrElse(defaultComponent)) }
}
else{
graph.outerJoinVertices(graph.staticPageRank(numIter, resetProbability).vertices){
case (id, vd, pr) => (vd, pr)
}.outerJoinVertices(graph.connectedComponents().vertices) {
case (id, (vd, pr), cc) => (vd, pr, cc)}
.outerJoinVertices(graph.stronglyConnectedComponents(numIter).vertices) {
case (id, (vd, pr, cc), scc) => VertexDataPR(vd.url, pr.getOrElse(defaultPR), cc.getOrElse(defaultComponent), scc.getOrElse(defaultComponent)) }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ import org.apache.spark.{RangePartitioner, SparkContext}

/** Extract most popular images from an RDD. */
object ExtractPopularImages {
val LIMIT_MAXIMUM: Int = 500
val LIMIT_DENOMINATOR: Int = 250
val MIN_WIDTH: Int = 30
val MIN_HEIGHT: Int = 30

/** Extracts the <i>n</i> most popular images from an RDD within a given size range.
*
Expand All @@ -32,7 +36,7 @@ object ExtractPopularImages {
* @param minWidth of image
* @param minHeight of image
*/
def apply(records: RDD[ArchiveRecord], limit: Int, sc:SparkContext, minWidth: Int = 30, minHeight: Int = 30) = {
def apply(records: RDD[ArchiveRecord], limit: Int, sc:SparkContext, minWidth: Int = MIN_WIDTH, minHeight: Int = MIN_HEIGHT): RDD[String] = {
val res = records
.keepImages()
.map(r => ((r.getUrl, r.getImageBytes), 1))
Expand All @@ -42,7 +46,7 @@ object ExtractPopularImages {
.map(x=> (x._2._3, x._2._2))
.takeOrdered(limit)(Ordering[Int].on(x => -x._1))
res.foreach(x => println(x._1 + "\t" + x._2))
val numPartitions = if (limit <= 500) 1 else Math.ceil(limit / 250).toInt
val numPartitions = if (limit <= LIMIT_MAXIMUM) 1 else Math.ceil(limit / LIMIT_DENOMINATOR).toInt
val rdd = sc.parallelize(res)
rdd.repartitionAndSortWithinPartitions(
new RangePartitioner(numPartitions, rdd, false)).sortByKey(false).map(x=>x._1 + "\t" + x._2)
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/io/archivesunleashed/app/WriteGraphML.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,6 @@ object WriteGraphML {
outFile.write("</graph>\n" +
"</graphml>")
outFile.close()
return true
true
}
}
95 changes: 95 additions & 0 deletions src/main/scala/io/archivesunleashed/app/WriteGraphXML.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Archives Unleashed Toolkit (AUT):
* An open-source platform for analyzing web archives.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.archivesunleashed.app

import io.archivesunleashed.matchbox._

import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}
import io.archivesunleashed.app.ExtractGraphX.{VertexData,EdgeData,VertexDataPR}
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

/**
* UDF for exporting an GraphX object representing a collection of links to a GraphML file.
*/
object WriteGraphXML {
/** Writes graphX object nodes and edges to file.
*
* @param graph GraphX object of type Graph[VertexDataPR, EdgeData]
* @param graphmlPath output file
*/
def apply(graph: Graph[VertexDataPR, EdgeData], graphmlPath: String): Boolean = {
if (graphmlPath.isEmpty()) {
false
} else {
makeFile (graph, graphmlPath)
}
}

/** Produces the GraphML output from a GraphX object and outputs it to graphmlPath.
*
* @param graph GraphX object of type Graph[VertexDataPR, EdgeData]
* @param graphmlPath output file
* @return true on successful run.
*/
def makeFile (graph: Graph[VertexDataPR, EdgeData], graphmlPath: String): Boolean = {
val outFile = Files.newBufferedWriter(Paths.get(graphmlPath), StandardCharsets.UTF_8)

val END_DATA_XML: String = "</data>\n"

val mappedGraph = graph.triplets.map(triplet => List("<edge source=\"" + triplet.srcId + "\" target=\"" +
triplet.dstId + "\" type=\"directed\">\n" +
"<data key=\"weight\">" + triplet.attr.edgeCount + END_DATA_XML +
"</edge>\n", "<node id=\"" + triplet.srcId + "\">\n" +
"<data key=\"pageRank\">" + triplet.srcAttr.pageRank + END_DATA_XML +
"<data key=\"connectedComponent\">" + triplet.srcAttr.weak + END_DATA_XML +
"<data key=\"stronglyConnectedComponent\">" + triplet.srcAttr.strong + END_DATA_XML +
"<data key=\"label\">" + triplet.srcAttr.url + END_DATA_XML + "</node>\n",
"<node id=\"" + triplet.dstId + "\">\n" +
"<data key=\"pageRank\">" + triplet.dstAttr.pageRank + END_DATA_XML +
"<data key=\"connectedComponent\">" + triplet.dstAttr.weak + END_DATA_XML +
"<data key=\"stronglyConnectedComponent\">" + triplet.dstAttr.strong + END_DATA_XML +
"<data key=\"label\">" + triplet.dstAttr.url + END_DATA_XML + "</node>\n")).distinct.collect

outFile.write("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" +
"<graphml xmlns=\"http://graphml.graphdrawing.org/xmlns\"\n" +
" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"\n" +
" xsi:schemaLocation=\"http://graphml.graphdrawing.org/xmlns\n" +
" http://graphml.graphdrawing.org/xmlns/1.0/graphml.xsd\">\n" +
"<key id=\"label\" for=\"node\" attr.name=\"label\" attr.type=\"string\" />\n" +
"<key id=\"weight\" for=\"edge\" attr.name=\"weight\" attr.type=\"double\">\n" +
"<default>0.0</default>\n" +
"</key>\n" +
"<key id=\"pageRank\" for=\"node\" attr.name=\"pageRank\" " +
"attr.type=\"double\" />\n" +
"<key id=\"stronglyConnectedComponent\" for=\"node\" " +
"attr.name=\"stronglyConnectedComponent\" attr.type=\"int\" />\n" +
"<key id=\"connectedComponent\" for=\"node\" " +
"attr.name=\"connectedComponent\" attr.type=\"int\" />\n" +
"<graph mode=\"static\" edgedefault=\"directed\">\n")
outFile.write("<nodes>\n")
mappedGraph.foreach(r => outFile.write(r(1) + r(2)))
outFile.write("\n</nodes>\n<edges>\n")
mappedGraph.foreach(r => outFile.write(r(0)))
outFile.write("\n</edges>\n")
outFile.write("</graph>\n" +
"</graphml>")
outFile.close()
true
}
}
4 changes: 1 addition & 3 deletions src/main/scala/io/archivesunleashed/df/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ package object df {
* @param df the input dataframe
*/
implicit class SaveImage(df: DataFrame) {
/**
/**
* @param bytesColumnName the name of the column containing the image bytes
* @param fileName the name of the file to save the images to (without extension)
* e.g. fileName = "foo" => images are saved as foo0.jpg, foo1.jpg
Expand All @@ -53,10 +53,8 @@ package object df {
ImageIO.write(image, format, file);
}
}
row
} catch {
case e: Throwable => {
row
}
}
})
Expand Down
Loading

0 comments on commit afe9254

Please sign in to comment.