Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ExtractGraphX including algorithms for PageRank and Components. Issue 203 #245

Merged
merged 34 commits into from
Jul 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
62dfb3f
pom.xml change for GraphX
hardiksahi May 4, 2018
2778bf5
pom.xml change for GraphX
hardiksahi May 4, 2018
13e6723
Changes for GraphXSLS
hardiksahi May 4, 2018
5f5a4b0
Changes for GraphXSLS
hardiksahi May 4, 2018
8adb2b3
Changes for SLS graph
hardiksahi May 4, 2018
54c9133
Changes for SLS graph
hardiksahi May 4, 2018
d64fe13
Change
hardiksahi May 4, 2018
3f63b3c
Changes
hardiksahi May 7, 2018
e64e298
Changes
hardiksahi May 7, 2018
e22f01e
Changes
hardiksahi May 7, 2018
37e9aa7
Changes
hardiksahi May 7, 2018
2b81550
Changes
hardiksahi May 7, 2018
ff7dd7d
Changes
hardiksahi May 7, 2018
afba7b6
Changes
hardiksahi May 8, 2018
41f6ef8
Changes for GraphX
hardiksahi May 8, 2018
1ddd484
Changes
hardiksahi May 8, 2018
12c3ded
Changes
hardiksahi May 8, 2018
eeb18c2
Changes for GraphX
hardiksahi May 9, 2018
e5c9be7
Changes
hardiksahi May 9, 2018
6fdadc5
Changes
hardiksahi May 9, 2018
ae434c3
Changes
hardiksahi May 10, 2018
7077035
Changes for converting WARC RDD to GraphX object
hardiksahi May 17, 2018
ef0ad13
Merge branch 'master' of github.com:hardiksahi/aut into issue-203
ruebot Jul 25, 2018
a02a74e
Make the TravisCI build less verbose since we're hitting the 4MB log …
ruebot Jul 25, 2018
e0c95fd
Pin site.plugin and project-info-reports.plugin so mvn site builds.
ruebot Jul 25, 2018
a6aa179
- Rename extractor to ExtractGraphX
greebie Jul 26, 2018
00879e2
Lint fixes.
greebie Jul 26, 2018
e87ab2b
Setup GraphX test file.
greebie Jul 26, 2018
1475faa
Revise GraphX test for more completeness.
greebie Jul 26, 2018
3d32493
Minor fixes to ExtractGraphXTest.
greebie Jul 26, 2018
d33c06f
Fix typo errors in WriteGraphXML, and tested it properly in Gephi thi…
greebie Jul 27, 2018
0657870
Merge branch 'master' into issue-203
greebie Jul 27, 2018
9b12932
Add WriteGraphXMLTest.
greebie Jul 27, 2018
d1d1603
Merge branch 'issue-203' of https://github.com/archivesunleashed/aut …
greebie Jul 27, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/main/scala/io/archivesunleashed/ArchiveRecord.scala
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,14 @@ class ArchiveRecordImpl(r: SerializableWritable[ArchiveRecordWritable]) extends
new String(getContentBytes)
}

val getMimeType = {
val getMimeType: String = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a quick question - what's this change for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a number of lint change request for scalastyle that need to be addressed.

I decided to pick some of these off to reduce the errors that show up in the build.

Part of that is requiring explicit types for any public method. I'm pretty sure mime types are always string.

if (r.t.getFormat == ArchiveRecordWritable.ArchiveFormat.ARC)
arcRecord.getMetaData.getMimetype
else
WarcRecordUtils.getWarcResponseMimeType(getContentBytes)
}

val getUrl = {
val getUrl: String = {
if (r.t.getFormat == ArchiveRecordWritable.ArchiveFormat.ARC)
arcRecord.getMetaData.getUrl
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package io.archivesunleashed.app

import io.archivesunleashed._
import io.archivesunleashed.{ArchiveRecord, DataFrameLoader, CountableRDD}
import io.archivesunleashed.matchbox
import io.archivesunleashed.df
import org.apache.spark.rdd.RDD
Expand All @@ -30,7 +30,7 @@ object DomainFrequencyExtractor {
* @param records RDD[ArchiveRecord] obtained from RecordLoader
* @return RDD[(String,Int))], which holds (DomainName, DomainFrequency)
*/
def apply(records: RDD[ArchiveRecord]) = {
def apply(records: RDD[ArchiveRecord]): RDD[(String, Int)] = {
records
.keepValidPages()
.map(r => matchbox.ExtractDomain(r.getUrl))
Expand All @@ -44,7 +44,9 @@ object DomainFrequencyExtractor {
*/
def apply(d: DataFrame): Dataset[Row] = {
val spark = SparkSession.builder().master("local").getOrCreate()
import spark.implicits._
// scalastyle:off
import spark.implicits._
// scalastyle:on

d.select(df.ExtractBaseDomain($"Url").as("Domain"))
.groupBy("Domain").count().orderBy(desc("count"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package io.archivesunleashed.app

import io.archivesunleashed._
import io.archivesunleashed.{ArchiveRecord, DataFrameLoader, CountableRDD}
import io.archivesunleashed.matchbox.{ExtractDomain, ExtractLinks}
import io.archivesunleashed.df
import org.apache.spark.rdd.RDD
Expand All @@ -31,7 +31,7 @@ object DomainGraphExtractor {
* @return RDD[(String, String, String), Int],
* which holds ((CrawlDate, SourceDomain, DestinationDomain), Frequency)
*/
def apply(records: RDD[ArchiveRecord]) = {
def apply(records: RDD[ArchiveRecord]): RDD[((String, String, String), Int)] = {
records
.keepValidPages()
.map(r => (r.getCrawlDate, ExtractLinks(r.getUrl, r.getContentString)))
Expand All @@ -52,8 +52,9 @@ object DomainGraphExtractor {
*/
def apply(d: DataFrame): Dataset[Row] = {
val spark = SparkSession.builder().master("local").getOrCreate()
// scalastyle:off
import spark.implicits._

// scalastyle:on
d.select($"CrawlDate",
df.RemovePrefixWWW(df.ExtractBaseDomain($"Src")).as("SrcDomain"),
df.RemovePrefixWWW(df.ExtractBaseDomain($"Dest")).as("DestDomain"))
Expand Down
11 changes: 7 additions & 4 deletions src/main/scala/io/archivesunleashed/app/ExtractGraph.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@
*/
package io.archivesunleashed.app

import io.archivesunleashed._
import io.archivesunleashed.matchbox._
import io.archivesunleashed.ArchiveRecord
import io.archivesunleashed.matchbox.{ExtractLinks, ExtractDomain, WWWLink}
import io.archivesunleashed.util.JsonUtils
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

/** Extracts a network graph using Spark's GraphX utility. */
@deprecated("Use ExtractGraphX instead.", "0.16.1")
object ExtractGraph {
val TOLERANCE: Double = 0.005
val NUM_ITER: Int = 20

/** Creates a hashcode from a url to use as a unique id.
*
Expand All @@ -48,7 +51,7 @@ object ExtractGraph {
* @return a Graph object containing data for vertices and edges as extracted.
*/
def apply(records: RDD[ArchiveRecord], dynamic: Boolean = false,
tolerance: Double = 0.005, numIter: Int = 20): Graph[VertexData, EdgeData] = {
tolerance: Double = TOLERANCE, numIter: Int = NUM_ITER): Graph[VertexData, EdgeData] = {
val extractedLinks = records.keepValidPages()
.map(r => (r.getCrawlDate, ExtractLinks(r.getUrl, r.getContentString)))
.flatMap(r => r._2.map(f => (r._1, ExtractDomain(f._1).removePrefixWWW(), ExtractDomain(f._2).removePrefixWWW())))
Expand Down Expand Up @@ -94,7 +97,7 @@ object ExtractGraph {
* @param edgesPath Filepath for edges output
* @return Unit().
*/
def writeAsJson(verticesPath: String, edgesPath: String) = {
def writeAsJson(verticesPath: String, edgesPath: String): Unit = {
// Combine edges of a given (date, src, dst) combination into single record with count value.
val edgesCounted = graph.edges.countItems().map {
r => Map("date" -> r._1.attr.date,
Expand Down
95 changes: 95 additions & 0 deletions src/main/scala/io/archivesunleashed/app/ExtractGraphX.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Archives Unleashed Toolkit (AUT):
* An open-source platform for analyzing web archives.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.archivesunleashed.app

import io.archivesunleashed._
import io.archivesunleashed.matchbox._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

/** Extracts a site link structure using Spark's GraphX utility. */
object ExtractGraphX {

/** Creates a hashcode from a url to use as a unique id.
*
* @param url
* @return unique id as long integer.
*/
def pageHash(url: String): VertexId = {
url.hashCode.toLong
}
val DEF_TOLERANCE = 0.005
val DEF_NUMITER = 20
val DEF_RESET = 0.15
/* tolerance level (probability of following a link) for page rank */
var tolerance: Double = DEF_TOLERANCE
/* number of iterations for page rank and strongly connected components */
var numIter: Int = DEF_NUMITER
/* probability page rank "walk" will start from a random position */
var resetProbability: Double = DEF_RESET
/* whether to calculate dynamic page rank */
var dynamic: Boolean = false
/* default page rank value */
val defaultPR: Double = 0.0
val defaultComponent: Long = 0
case class VertexData(url: String)
case class EdgeData(edgeCount: Int)
case class VertexDataPR(url: String, pageRank: Double, weak: Long, strong: Long)

/** Creates a GraphX object
*
* @param records an RDD of tuples (source, destination)
* @return a GraphX object
*/
def extractGraphX(records: RDD[(String, String)]): Graph[VertexData, EdgeData] = {
val extractedLinks = records.persist()
val vertices: RDD[(VertexId, VertexData)] = extractedLinks
.flatMap(r => List(r._1, r._2))
.distinct
.map(r => (pageHash(r), VertexData(r)))
val edges: RDD[Edge[EdgeData]] = extractedLinks
.map(r => Edge(pageHash(r._1), pageHash(r._2), EdgeData(1)))
val graph = Graph(vertices, edges)
.partitionBy(PartitionStrategy.RandomVertexCut)
.groupEdges((e1,e2) => EdgeData(e1.edgeCount + e2.edgeCount))
graph
}

/** calculates basic graph data (Page Rank, weak and strong components) for Graph
*
* @param graph GraphX object
* @return new graph object with additional attributes
*/
def runPageRankAlgorithm(graph: Graph[VertexData, EdgeData]): Graph[VertexDataPR, EdgeData] = {
if(dynamic){
graph.outerJoinVertices(graph.pageRank(numIter, resetProbability).vertices) {
case (id, vd, pr) => (vd, pr)}
.outerJoinVertices(graph.connectedComponents().vertices) {
case (id, (vd, pr), cc) => (vd, pr, cc)}
.outerJoinVertices(graph.stronglyConnectedComponents(numIter).vertices) {
case (id, (vd, pr, cc), scc) => VertexDataPR(vd.url, pr.getOrElse(defaultPR), cc.getOrElse(defaultComponent), scc.getOrElse(defaultComponent)) }
}
else{
graph.outerJoinVertices(graph.staticPageRank(numIter, resetProbability).vertices){
case (id, vd, pr) => (vd, pr)
}.outerJoinVertices(graph.connectedComponents().vertices) {
case (id, (vd, pr), cc) => (vd, pr, cc)}
.outerJoinVertices(graph.stronglyConnectedComponents(numIter).vertices) {
case (id, (vd, pr, cc), scc) => VertexDataPR(vd.url, pr.getOrElse(defaultPR), cc.getOrElse(defaultComponent), scc.getOrElse(defaultComponent)) }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ import org.apache.spark.{RangePartitioner, SparkContext}

/** Extract most popular images from an RDD. */
object ExtractPopularImages {
val LIMIT_MAXIMUM: Int = 500
val LIMIT_DENOMINATOR: Int = 250
val MIN_WIDTH: Int = 30
val MIN_HEIGHT: Int = 30

/** Extracts the <i>n</i> most popular images from an RDD within a given size range.
*
Expand All @@ -32,7 +36,7 @@ object ExtractPopularImages {
* @param minWidth of image
* @param minHeight of image
*/
def apply(records: RDD[ArchiveRecord], limit: Int, sc:SparkContext, minWidth: Int = 30, minHeight: Int = 30) = {
def apply(records: RDD[ArchiveRecord], limit: Int, sc:SparkContext, minWidth: Int = MIN_WIDTH, minHeight: Int = MIN_HEIGHT): RDD[String] = {
val res = records
.keepImages()
.map(r => ((r.getUrl, r.getImageBytes), 1))
Expand All @@ -42,7 +46,7 @@ object ExtractPopularImages {
.map(x=> (x._2._3, x._2._2))
.takeOrdered(limit)(Ordering[Int].on(x => -x._1))
res.foreach(x => println(x._1 + "\t" + x._2))
val numPartitions = if (limit <= 500) 1 else Math.ceil(limit / 250).toInt
val numPartitions = if (limit <= LIMIT_MAXIMUM) 1 else Math.ceil(limit / LIMIT_DENOMINATOR).toInt
val rdd = sc.parallelize(res)
rdd.repartitionAndSortWithinPartitions(
new RangePartitioner(numPartitions, rdd, false)).sortByKey(false).map(x=>x._1 + "\t" + x._2)
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/io/archivesunleashed/app/WriteGraphML.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,6 @@ object WriteGraphML {
outFile.write("</graph>\n" +
"</graphml>")
outFile.close()
return true
true
}
}
95 changes: 95 additions & 0 deletions src/main/scala/io/archivesunleashed/app/WriteGraphXML.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Archives Unleashed Toolkit (AUT):
* An open-source platform for analyzing web archives.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.archivesunleashed.app

import io.archivesunleashed.matchbox._

import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}
import io.archivesunleashed.app.ExtractGraphX.{VertexData,EdgeData,VertexDataPR}
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

/**
* UDF for exporting an GraphX object representing a collection of links to a GraphML file.
*/
object WriteGraphXML {
/** Writes graphX object nodes and edges to file.
*
* @param graph GraphX object of type Graph[VertexDataPR, EdgeData]
* @param graphmlPath output file
*/
def apply(graph: Graph[VertexDataPR, EdgeData], graphmlPath: String): Boolean = {
if (graphmlPath.isEmpty()) {
false
} else {
makeFile (graph, graphmlPath)
}
}

/** Produces the GraphML output from a GraphX object and outputs it to graphmlPath.
*
* @param graph GraphX object of type Graph[VertexDataPR, EdgeData]
* @param graphmlPath output file
* @return true on successful run.
*/
def makeFile (graph: Graph[VertexDataPR, EdgeData], graphmlPath: String): Boolean = {
val outFile = Files.newBufferedWriter(Paths.get(graphmlPath), StandardCharsets.UTF_8)

val END_DATA_XML: String = "</data>\n"

val mappedGraph = graph.triplets.map(triplet => List("<edge source=\"" + triplet.srcId + "\" target=\"" +
triplet.dstId + "\" type=\"directed\">\n" +
"<data key=\"weight\">" + triplet.attr.edgeCount + END_DATA_XML +
"</edge>\n", "<node id=\"" + triplet.srcId + "\">\n" +
"<data key=\"pageRank\">" + triplet.srcAttr.pageRank + END_DATA_XML +
"<data key=\"connectedComponent\">" + triplet.srcAttr.weak + END_DATA_XML +
"<data key=\"stronglyConnectedComponent\">" + triplet.srcAttr.strong + END_DATA_XML +
"<data key=\"label\">" + triplet.srcAttr.url + END_DATA_XML + "</node>\n",
"<node id=\"" + triplet.dstId + "\">\n" +
"<data key=\"pageRank\">" + triplet.dstAttr.pageRank + END_DATA_XML +
"<data key=\"connectedComponent\">" + triplet.dstAttr.weak + END_DATA_XML +
"<data key=\"stronglyConnectedComponent\">" + triplet.dstAttr.strong + END_DATA_XML +
"<data key=\"label\">" + triplet.dstAttr.url + END_DATA_XML + "</node>\n")).distinct.collect

outFile.write("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" +
"<graphml xmlns=\"http://graphml.graphdrawing.org/xmlns\"\n" +
" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"\n" +
" xsi:schemaLocation=\"http://graphml.graphdrawing.org/xmlns\n" +
" http://graphml.graphdrawing.org/xmlns/1.0/graphml.xsd\">\n" +
"<key id=\"label\" for=\"node\" attr.name=\"label\" attr.type=\"string\" />\n" +
"<key id=\"weight\" for=\"edge\" attr.name=\"weight\" attr.type=\"double\">\n" +
"<default>0.0</default>\n" +
"</key>\n" +
"<key id=\"pageRank\" for=\"node\" attr.name=\"pageRank\" " +
"attr.type=\"double\" />\n" +
"<key id=\"stronglyConnectedComponent\" for=\"node\" " +
"attr.name=\"stronglyConnectedComponent\" attr.type=\"int\" />\n" +
"<key id=\"connectedComponent\" for=\"node\" " +
"attr.name=\"connectedComponent\" attr.type=\"int\" />\n" +
"<graph mode=\"static\" edgedefault=\"directed\">\n")
outFile.write("<nodes>\n")
mappedGraph.foreach(r => outFile.write(r(1) + r(2)))
outFile.write("\n</nodes>\n<edges>\n")
mappedGraph.foreach(r => outFile.write(r(0)))
outFile.write("\n</edges>\n")
outFile.write("</graph>\n" +
"</graphml>")
outFile.close()
true
}
}
4 changes: 1 addition & 3 deletions src/main/scala/io/archivesunleashed/df/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ package object df {
* @param df the input dataframe
*/
implicit class SaveImage(df: DataFrame) {
/**
/**
* @param bytesColumnName the name of the column containing the image bytes
* @param fileName the name of the file to save the images to (without extension)
* e.g. fileName = "foo" => images are saved as foo0.jpg, foo1.jpg
Expand All @@ -53,10 +53,8 @@ package object df {
ImageIO.write(image, format, file);
}
}
row
} catch {
case e: Throwable => {
row
}
}
})
Expand Down
Loading