diff --git a/src/main/scala/io/archivesunleashed/ArchiveRecord.scala b/src/main/scala/io/archivesunleashed/ArchiveRecord.scala index ca0a76fe..04a6eadc 100644 --- a/src/main/scala/io/archivesunleashed/ArchiveRecord.scala +++ b/src/main/scala/io/archivesunleashed/ArchiveRecord.scala @@ -98,14 +98,14 @@ class ArchiveRecordImpl(r: SerializableWritable[ArchiveRecordWritable]) extends new String(getContentBytes) } - val getMimeType = { + val getMimeType: String = { if (r.t.getFormat == ArchiveRecordWritable.ArchiveFormat.ARC) arcRecord.getMetaData.getMimetype else WarcRecordUtils.getWarcResponseMimeType(getContentBytes) } - val getUrl = { + val getUrl: String = { if (r.t.getFormat == ArchiveRecordWritable.ArchiveFormat.ARC) arcRecord.getMetaData.getUrl else diff --git a/src/main/scala/io/archivesunleashed/app/DomainFrequencyExtractor.scala b/src/main/scala/io/archivesunleashed/app/DomainFrequencyExtractor.scala index 6fdcbec5..accb0c3c 100644 --- a/src/main/scala/io/archivesunleashed/app/DomainFrequencyExtractor.scala +++ b/src/main/scala/io/archivesunleashed/app/DomainFrequencyExtractor.scala @@ -17,7 +17,7 @@ package io.archivesunleashed.app -import io.archivesunleashed._ +import io.archivesunleashed.{ArchiveRecord, DataFrameLoader, CountableRDD} import io.archivesunleashed.matchbox import io.archivesunleashed.df import org.apache.spark.rdd.RDD @@ -30,7 +30,7 @@ object DomainFrequencyExtractor { * @param records RDD[ArchiveRecord] obtained from RecordLoader * @return RDD[(String,Int))], which holds (DomainName, DomainFrequency) */ - def apply(records: RDD[ArchiveRecord]) = { + def apply(records: RDD[ArchiveRecord]): RDD[(String, Int)] = { records .keepValidPages() .map(r => matchbox.ExtractDomain(r.getUrl)) @@ -44,7 +44,9 @@ object DomainFrequencyExtractor { */ def apply(d: DataFrame): Dataset[Row] = { val spark = SparkSession.builder().master("local").getOrCreate() - import spark.implicits._ + // scalastyle:off + import spark.implicits._ + // scalastyle:on d.select(df.ExtractBaseDomain($"Url").as("Domain")) .groupBy("Domain").count().orderBy(desc("count")) diff --git a/src/main/scala/io/archivesunleashed/app/DomainGraphExtractor.scala b/src/main/scala/io/archivesunleashed/app/DomainGraphExtractor.scala index 8e538f51..fc151cf8 100644 --- a/src/main/scala/io/archivesunleashed/app/DomainGraphExtractor.scala +++ b/src/main/scala/io/archivesunleashed/app/DomainGraphExtractor.scala @@ -17,7 +17,7 @@ package io.archivesunleashed.app -import io.archivesunleashed._ +import io.archivesunleashed.{ArchiveRecord, DataFrameLoader, CountableRDD} import io.archivesunleashed.matchbox.{ExtractDomain, ExtractLinks} import io.archivesunleashed.df import org.apache.spark.rdd.RDD @@ -31,7 +31,7 @@ object DomainGraphExtractor { * @return RDD[(String, String, String), Int], * which holds ((CrawlDate, SourceDomain, DestinationDomain), Frequency) */ - def apply(records: RDD[ArchiveRecord]) = { + def apply(records: RDD[ArchiveRecord]): RDD[((String, String, String), Int)] = { records .keepValidPages() .map(r => (r.getCrawlDate, ExtractLinks(r.getUrl, r.getContentString))) @@ -52,8 +52,9 @@ object DomainGraphExtractor { */ def apply(d: DataFrame): Dataset[Row] = { val spark = SparkSession.builder().master("local").getOrCreate() + // scalastyle:off import spark.implicits._ - + // scalastyle:on d.select($"CrawlDate", df.RemovePrefixWWW(df.ExtractBaseDomain($"Src")).as("SrcDomain"), df.RemovePrefixWWW(df.ExtractBaseDomain($"Dest")).as("DestDomain")) diff --git a/src/main/scala/io/archivesunleashed/app/ExtractGraph.scala b/src/main/scala/io/archivesunleashed/app/ExtractGraph.scala index 38707dfb..78f3b142 100644 --- a/src/main/scala/io/archivesunleashed/app/ExtractGraph.scala +++ b/src/main/scala/io/archivesunleashed/app/ExtractGraph.scala @@ -16,14 +16,17 @@ */ package io.archivesunleashed.app -import io.archivesunleashed._ -import io.archivesunleashed.matchbox._ +import io.archivesunleashed.ArchiveRecord +import io.archivesunleashed.matchbox.{ExtractLinks, ExtractDomain, WWWLink} import io.archivesunleashed.util.JsonUtils import org.apache.spark.graphx._ import org.apache.spark.rdd.RDD /** Extracts a network graph using Spark's GraphX utility. */ +@deprecated("Use ExtractGraphX instead.", "0.16.1") object ExtractGraph { + val TOLERANCE: Double = 0.005 + val NUM_ITER: Int = 20 /** Creates a hashcode from a url to use as a unique id. * @@ -48,7 +51,7 @@ object ExtractGraph { * @return a Graph object containing data for vertices and edges as extracted. */ def apply(records: RDD[ArchiveRecord], dynamic: Boolean = false, - tolerance: Double = 0.005, numIter: Int = 20): Graph[VertexData, EdgeData] = { + tolerance: Double = TOLERANCE, numIter: Int = NUM_ITER): Graph[VertexData, EdgeData] = { val extractedLinks = records.keepValidPages() .map(r => (r.getCrawlDate, ExtractLinks(r.getUrl, r.getContentString))) .flatMap(r => r._2.map(f => (r._1, ExtractDomain(f._1).removePrefixWWW(), ExtractDomain(f._2).removePrefixWWW()))) @@ -94,7 +97,7 @@ object ExtractGraph { * @param edgesPath Filepath for edges output * @return Unit(). */ - def writeAsJson(verticesPath: String, edgesPath: String) = { + def writeAsJson(verticesPath: String, edgesPath: String): Unit = { // Combine edges of a given (date, src, dst) combination into single record with count value. val edgesCounted = graph.edges.countItems().map { r => Map("date" -> r._1.attr.date, diff --git a/src/main/scala/io/archivesunleashed/app/ExtractGraphX.scala b/src/main/scala/io/archivesunleashed/app/ExtractGraphX.scala new file mode 100644 index 00000000..3f1411af --- /dev/null +++ b/src/main/scala/io/archivesunleashed/app/ExtractGraphX.scala @@ -0,0 +1,95 @@ +/* + * Archives Unleashed Toolkit (AUT): + * An open-source platform for analyzing web archives. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.archivesunleashed.app + +import io.archivesunleashed._ +import io.archivesunleashed.matchbox._ +import org.apache.spark.graphx._ +import org.apache.spark.rdd.RDD + +/** Extracts a site link structure using Spark's GraphX utility. */ +object ExtractGraphX { + + /** Creates a hashcode from a url to use as a unique id. + * + * @param url + * @return unique id as long integer. + */ + def pageHash(url: String): VertexId = { + url.hashCode.toLong + } + val DEF_TOLERANCE = 0.005 + val DEF_NUMITER = 20 + val DEF_RESET = 0.15 + /* tolerance level (probability of following a link) for page rank */ + var tolerance: Double = DEF_TOLERANCE + /* number of iterations for page rank and strongly connected components */ + var numIter: Int = DEF_NUMITER + /* probability page rank "walk" will start from a random position */ + var resetProbability: Double = DEF_RESET + /* whether to calculate dynamic page rank */ + var dynamic: Boolean = false + /* default page rank value */ + val defaultPR: Double = 0.0 + val defaultComponent: Long = 0 + case class VertexData(url: String) + case class EdgeData(edgeCount: Int) + case class VertexDataPR(url: String, pageRank: Double, weak: Long, strong: Long) + + /** Creates a GraphX object + * + * @param records an RDD of tuples (source, destination) + * @return a GraphX object + */ + def extractGraphX(records: RDD[(String, String)]): Graph[VertexData, EdgeData] = { + val extractedLinks = records.persist() + val vertices: RDD[(VertexId, VertexData)] = extractedLinks + .flatMap(r => List(r._1, r._2)) + .distinct + .map(r => (pageHash(r), VertexData(r))) + val edges: RDD[Edge[EdgeData]] = extractedLinks + .map(r => Edge(pageHash(r._1), pageHash(r._2), EdgeData(1))) + val graph = Graph(vertices, edges) + .partitionBy(PartitionStrategy.RandomVertexCut) + .groupEdges((e1,e2) => EdgeData(e1.edgeCount + e2.edgeCount)) + graph + } + + /** calculates basic graph data (Page Rank, weak and strong components) for Graph + * + * @param graph GraphX object + * @return new graph object with additional attributes + */ + def runPageRankAlgorithm(graph: Graph[VertexData, EdgeData]): Graph[VertexDataPR, EdgeData] = { + if(dynamic){ + graph.outerJoinVertices(graph.pageRank(numIter, resetProbability).vertices) { + case (id, vd, pr) => (vd, pr)} + .outerJoinVertices(graph.connectedComponents().vertices) { + case (id, (vd, pr), cc) => (vd, pr, cc)} + .outerJoinVertices(graph.stronglyConnectedComponents(numIter).vertices) { + case (id, (vd, pr, cc), scc) => VertexDataPR(vd.url, pr.getOrElse(defaultPR), cc.getOrElse(defaultComponent), scc.getOrElse(defaultComponent)) } + } + else{ + graph.outerJoinVertices(graph.staticPageRank(numIter, resetProbability).vertices){ + case (id, vd, pr) => (vd, pr) + }.outerJoinVertices(graph.connectedComponents().vertices) { + case (id, (vd, pr), cc) => (vd, pr, cc)} + .outerJoinVertices(graph.stronglyConnectedComponents(numIter).vertices) { + case (id, (vd, pr, cc), scc) => VertexDataPR(vd.url, pr.getOrElse(defaultPR), cc.getOrElse(defaultComponent), scc.getOrElse(defaultComponent)) } + } + } +} diff --git a/src/main/scala/io/archivesunleashed/app/ExtractPopularImages.scala b/src/main/scala/io/archivesunleashed/app/ExtractPopularImages.scala index 99702259..47e85758 100644 --- a/src/main/scala/io/archivesunleashed/app/ExtractPopularImages.scala +++ b/src/main/scala/io/archivesunleashed/app/ExtractPopularImages.scala @@ -23,6 +23,10 @@ import org.apache.spark.{RangePartitioner, SparkContext} /** Extract most popular images from an RDD. */ object ExtractPopularImages { + val LIMIT_MAXIMUM: Int = 500 + val LIMIT_DENOMINATOR: Int = 250 + val MIN_WIDTH: Int = 30 + val MIN_HEIGHT: Int = 30 /** Extracts the <i>n</i> most popular images from an RDD within a given size range. * @@ -32,7 +36,7 @@ object ExtractPopularImages { * @param minWidth of image * @param minHeight of image */ - def apply(records: RDD[ArchiveRecord], limit: Int, sc:SparkContext, minWidth: Int = 30, minHeight: Int = 30) = { + def apply(records: RDD[ArchiveRecord], limit: Int, sc:SparkContext, minWidth: Int = MIN_WIDTH, minHeight: Int = MIN_HEIGHT): RDD[String] = { val res = records .keepImages() .map(r => ((r.getUrl, r.getImageBytes), 1)) @@ -42,7 +46,7 @@ object ExtractPopularImages { .map(x=> (x._2._3, x._2._2)) .takeOrdered(limit)(Ordering[Int].on(x => -x._1)) res.foreach(x => println(x._1 + "\t" + x._2)) - val numPartitions = if (limit <= 500) 1 else Math.ceil(limit / 250).toInt + val numPartitions = if (limit <= LIMIT_MAXIMUM) 1 else Math.ceil(limit / LIMIT_DENOMINATOR).toInt val rdd = sc.parallelize(res) rdd.repartitionAndSortWithinPartitions( new RangePartitioner(numPartitions, rdd, false)).sortByKey(false).map(x=>x._1 + "\t" + x._2) diff --git a/src/main/scala/io/archivesunleashed/app/WriteGraphML.scala b/src/main/scala/io/archivesunleashed/app/WriteGraphML.scala index f60b56c3..00522508 100644 --- a/src/main/scala/io/archivesunleashed/app/WriteGraphML.scala +++ b/src/main/scala/io/archivesunleashed/app/WriteGraphML.scala @@ -71,6 +71,6 @@ object WriteGraphML { outFile.write("</graph>\n" + "</graphml>") outFile.close() - return true + true } } diff --git a/src/main/scala/io/archivesunleashed/app/WriteGraphXML.scala b/src/main/scala/io/archivesunleashed/app/WriteGraphXML.scala new file mode 100644 index 00000000..421054f7 --- /dev/null +++ b/src/main/scala/io/archivesunleashed/app/WriteGraphXML.scala @@ -0,0 +1,95 @@ +/* + * Archives Unleashed Toolkit (AUT): + * An open-source platform for analyzing web archives. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.archivesunleashed.app + +import io.archivesunleashed.matchbox._ + +import java.nio.charset.StandardCharsets +import java.nio.file.{Files, Paths} +import io.archivesunleashed.app.ExtractGraphX.{VertexData,EdgeData,VertexDataPR} +import org.apache.spark.graphx._ +import org.apache.spark.rdd.RDD + +/** + * UDF for exporting an GraphX object representing a collection of links to a GraphML file. + */ +object WriteGraphXML { + /** Writes graphX object nodes and edges to file. + * + * @param graph GraphX object of type Graph[VertexDataPR, EdgeData] + * @param graphmlPath output file + */ + def apply(graph: Graph[VertexDataPR, EdgeData], graphmlPath: String): Boolean = { + if (graphmlPath.isEmpty()) { + false + } else { + makeFile (graph, graphmlPath) + } + } + + /** Produces the GraphML output from a GraphX object and outputs it to graphmlPath. + * + * @param graph GraphX object of type Graph[VertexDataPR, EdgeData] + * @param graphmlPath output file + * @return true on successful run. + */ + def makeFile (graph: Graph[VertexDataPR, EdgeData], graphmlPath: String): Boolean = { + val outFile = Files.newBufferedWriter(Paths.get(graphmlPath), StandardCharsets.UTF_8) + + val END_DATA_XML: String = "</data>\n" + + val mappedGraph = graph.triplets.map(triplet => List("<edge source=\"" + triplet.srcId + "\" target=\"" + + triplet.dstId + "\" type=\"directed\">\n" + + "<data key=\"weight\">" + triplet.attr.edgeCount + END_DATA_XML + + "</edge>\n", "<node id=\"" + triplet.srcId + "\">\n" + + "<data key=\"pageRank\">" + triplet.srcAttr.pageRank + END_DATA_XML + + "<data key=\"connectedComponent\">" + triplet.srcAttr.weak + END_DATA_XML + + "<data key=\"stronglyConnectedComponent\">" + triplet.srcAttr.strong + END_DATA_XML + + "<data key=\"label\">" + triplet.srcAttr.url + END_DATA_XML + "</node>\n", + "<node id=\"" + triplet.dstId + "\">\n" + + "<data key=\"pageRank\">" + triplet.dstAttr.pageRank + END_DATA_XML + + "<data key=\"connectedComponent\">" + triplet.dstAttr.weak + END_DATA_XML + + "<data key=\"stronglyConnectedComponent\">" + triplet.dstAttr.strong + END_DATA_XML + + "<data key=\"label\">" + triplet.dstAttr.url + END_DATA_XML + "</node>\n")).distinct.collect + + outFile.write("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" + + "<graphml xmlns=\"http://graphml.graphdrawing.org/xmlns\"\n" + + " xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"\n" + + " xsi:schemaLocation=\"http://graphml.graphdrawing.org/xmlns\n" + + " http://graphml.graphdrawing.org/xmlns/1.0/graphml.xsd\">\n" + + "<key id=\"label\" for=\"node\" attr.name=\"label\" attr.type=\"string\" />\n" + + "<key id=\"weight\" for=\"edge\" attr.name=\"weight\" attr.type=\"double\">\n" + + "<default>0.0</default>\n" + + "</key>\n" + + "<key id=\"pageRank\" for=\"node\" attr.name=\"pageRank\" " + + "attr.type=\"double\" />\n" + + "<key id=\"stronglyConnectedComponent\" for=\"node\" " + + "attr.name=\"stronglyConnectedComponent\" attr.type=\"int\" />\n" + + "<key id=\"connectedComponent\" for=\"node\" " + + "attr.name=\"connectedComponent\" attr.type=\"int\" />\n" + + "<graph mode=\"static\" edgedefault=\"directed\">\n") + outFile.write("<nodes>\n") + mappedGraph.foreach(r => outFile.write(r(1) + r(2))) + outFile.write("\n</nodes>\n<edges>\n") + mappedGraph.foreach(r => outFile.write(r(0))) + outFile.write("\n</edges>\n") + outFile.write("</graph>\n" + + "</graphml>") + outFile.close() + true + } +} diff --git a/src/main/scala/io/archivesunleashed/df/package.scala b/src/main/scala/io/archivesunleashed/df/package.scala index b6d1b3c7..1f946b68 100644 --- a/src/main/scala/io/archivesunleashed/df/package.scala +++ b/src/main/scala/io/archivesunleashed/df/package.scala @@ -26,7 +26,7 @@ package object df { * @param df the input dataframe */ implicit class SaveImage(df: DataFrame) { - /** + /** * @param bytesColumnName the name of the column containing the image bytes * @param fileName the name of the file to save the images to (without extension) * e.g. fileName = "foo" => images are saved as foo0.jpg, foo1.jpg @@ -53,10 +53,8 @@ package object df { ImageIO.write(image, format, file); } } - row } catch { case e: Throwable => { - row } } }) diff --git a/src/main/scala/io/archivesunleashed/package.scala b/src/main/scala/io/archivesunleashed/package.scala index f0525f75..2b77a77d 100644 --- a/src/main/scala/io/archivesunleashed/package.scala +++ b/src/main/scala/io/archivesunleashed/package.scala @@ -150,7 +150,7 @@ package object archivesunleashed { (r.getUrl, r.getMimeType, image.width, image.height, image.hash, image.body) }) .map(t => Row(t._1, t._2, t._3, t._4, t._5, t._6)) - + val schema = new StructType() .add(StructField("url", StringType, true)) .add(StructField("mime_type", StringType, true)) @@ -164,7 +164,7 @@ package object archivesunleashed { } /** Removes all data except images. */ - def keepImages() = { + def keepImages(): RDD[ArchiveRecord] = { rdd.filter(r => r.getCrawlDate != null && ( @@ -179,7 +179,7 @@ package object archivesunleashed { * * @param mimeTypes a Set of Mimetypes to keep */ - def keepMimeTypes(mimeTypes: Set[String]) = { + def keepMimeTypes(mimeTypes: Set[String]): RDD[ArchiveRecord] = { rdd.filter(r => mimeTypes.contains(r.getMimeType)) } @@ -188,7 +188,7 @@ package object archivesunleashed { * @param dates a list of dates to keep * @param component the selected DateComponent enum value */ - def keepDate(dates: List[String], component: DateComponent = DateComponent.YYYYMMDD) = { + def keepDate(dates: List[String], component: DateComponent = DateComponent.YYYYMMDD): RDD[ArchiveRecord] = { rdd.filter(r => dates.contains(ExtractDate(r.getCrawlDate, component))) } @@ -196,7 +196,7 @@ package object archivesunleashed { * * @param urls a Set of URLs to keep */ - def keepUrls(urls: Set[String]) = { + def keepUrls(urls: Set[String]): RDD[ArchiveRecord] = { rdd.filter(r => urls.contains(r.getUrl)) } @@ -204,7 +204,7 @@ package object archivesunleashed { * * @param urlREs a Set of Regular Expressions to keep */ - def keepUrlPatterns(urlREs: Set[Regex]) = { + def keepUrlPatterns(urlREs: Set[Regex]): RDD[ArchiveRecord] = { rdd.filter(r => urlREs.map(re => r.getUrl match { @@ -217,7 +217,7 @@ package object archivesunleashed { * * @param urls a Set of urls for the source domains to keep */ - def keepDomains(urls: Set[String]) = { + def keepDomains(urls: Set[String]): RDD[ArchiveRecord] = { rdd.filter(r => urls.contains(ExtractDomain(r.getUrl).replace("^\\s*www\\.", ""))) } @@ -225,7 +225,7 @@ package object archivesunleashed { * * @param lang a Set of ISO 639-2 codes */ - def keepLanguages(lang: Set[String]) = { + def keepLanguages(lang: Set[String]): RDD[ArchiveRecord] = { rdd.filter(r => lang.contains(DetectLanguage(RemoveHTML(r.getContentString)))) } @@ -233,7 +233,7 @@ package object archivesunleashed { * * @param contentREs a list of Regular expressions to keep */ - def keepContent(contentREs: Set[Regex]) = { + def keepContent(contentREs: Set[Regex]): RDD[ArchiveRecord] = { rdd.filter(r => contentREs.map(re => (re findFirstIn r.getContentString) match { @@ -246,19 +246,19 @@ package object archivesunleashed { * * @param mimeTypes */ - def discardMimeTypes(mimeTypes: Set[String]) = { + def discardMimeTypes(mimeTypes: Set[String]): RDD[ArchiveRecord] = { rdd.filter(r => !mimeTypes.contains(r.getMimeType)) } - def discardDate(date: String) = { + def discardDate(date: String): RDD[ArchiveRecord] = { rdd.filter(r => r.getCrawlDate != date) } - def discardUrls(urls: Set[String]) = { + def discardUrls(urls: Set[String]): RDD[ArchiveRecord] = { rdd.filter(r => !urls.contains(r.getUrl)) } - def discardUrlPatterns(urlREs: Set[Regex]) = { + def discardUrlPatterns(urlREs: Set[Regex]): RDD[ArchiveRecord] = { rdd.filter(r => !urlREs.map(re => r.getUrl match { @@ -267,11 +267,11 @@ package object archivesunleashed { }).exists(identity)) } - def discardDomains(urls: Set[String]) = { + def discardDomains(urls: Set[String]): RDD[ArchiveRecord] = { rdd.filter(r => !urls.contains(r.getDomain)) } - def discardContent(contentREs: Set[Regex]) = { + def discardContent(contentREs: Set[Regex]): RDD[ArchiveRecord] = { rdd.filter(r => !contentREs.map(re => (re findFirstIn r.getContentString) match { diff --git a/src/test/scala/io/archivesunleashed/app/DomainGraphExtractorDfTest.scala b/src/test/scala/io/archivesunleashed/app/DomainGraphExtractorDfTest.scala index efced487..5123c094 100644 --- a/src/test/scala/io/archivesunleashed/app/DomainGraphExtractorDfTest.scala +++ b/src/test/scala/io/archivesunleashed/app/DomainGraphExtractorDfTest.scala @@ -40,13 +40,15 @@ class DomainGraphExtractorDfTest extends FunSuite with BeforeAndAfter { } test("DomainGraphExtractor") { + val TESTLENGTH = 166 + val TESTRESULT = 316 val df = RecordLoader.loadArchives(arcPath, sc).extractHyperlinksDF() val dfResult = DomainGraphExtractor(df).collect() - assert(dfResult.length == 166) + assert(dfResult.length == TESTLENGTH) assert(dfResult(0).get(0) == "20080430") assert(dfResult(0).get(1) == "archive.org") assert(dfResult(0).get(2) == "archive.org") - assert(dfResult(0).get(3) == 316) + assert(dfResult(0).get(3) == TESTRESULT) } after { diff --git a/src/test/scala/io/archivesunleashed/app/ExtractGraphTest.scala b/src/test/scala/io/archivesunleashed/app/ExtractGraphTest.scala index 7b549619..00366588 100644 --- a/src/test/scala/io/archivesunleashed/app/ExtractGraphTest.scala +++ b/src/test/scala/io/archivesunleashed/app/ExtractGraphTest.scala @@ -33,6 +33,7 @@ import scala.util.Try // TODO: // See: https://github.com/archivesunleashed/aut/pull/204/files#diff-4541b9834513985c360b64093fd45073 //@RunWith(classOf[JUnitRunner]) + @deprecated("Replaced with ExtractGraphX", "0.16.1") class ExtractGraphTest extends FunSuite with BeforeAndAfter { private val arcPath = Resources.getResource("arc/example.arc.gz").getPath private var sc: SparkContext = _ diff --git a/src/test/scala/io/archivesunleashed/app/ExtractGraphXTest.scala b/src/test/scala/io/archivesunleashed/app/ExtractGraphXTest.scala new file mode 100644 index 00000000..395cf375 --- /dev/null +++ b/src/test/scala/io/archivesunleashed/app/ExtractGraphXTest.scala @@ -0,0 +1,108 @@ +/* + * Archives Unleashed Toolkit (AUT): + * An open-source platform for analyzing web archives. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.archivesunleashed.app + +import java.io.File +import java.nio.file.{Files, Paths} + +import com.google.common.io.Resources +import io.archivesunleashed._ +import io.archivesunleashed.matchbox._ +import io.archivesunleashed.app._ +import io.archivesunleashed.util._ +import org.apache.spark.graphx._ +import org.apache.commons.io.FileUtils +import org.apache.spark.{SparkConf, SparkContext} +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import org.scalatest.{BeforeAndAfter, FunSuite} + +import scala.util.Try + + @RunWith(classOf[JUnitRunner]) + class ExtractGraphXTest extends FunSuite with BeforeAndAfter { + private val arcPath = Resources.getResource("arc/example.arc.gz").getPath + private var sc: SparkContext = _ + private val master = "local[4]" + private val appName = "example-spark" + private val testVertexFile = "temporaryTestVertexDir" + private val testEdgesFile = "temporaryTestEdgesDir" + + before { + val conf = new SparkConf() + .setMaster(master) + .setAppName(appName) + conf.set("spark.driver.allowMultipleContexts", "true"); + sc = new SparkContext(conf) + } + + test ("Case classes are empty") { + val testPR = ExtractGraphX.VertexDataPR("url", 0.56, 4, 5) + val testVertexData = ExtractGraphX.VertexData("url") + val testEdgeData = ExtractGraphX.EdgeData(100) + assert(testVertexData.url == "url") + assert(testEdgeData.edgeCount == 100) + assert(testPR.url == "url") + assert(testPR.pageRank == 0.56) + assert(testPR.weak == 4) + assert(testPR.strong == 5) + } + + test("creates a network with pagerank scores") { + val examplerdd = RecordLoader.loadArchives(arcPath, sc) + .keepValidPages() + .flatMap(r => ExtractLinks(r.getUrl, r.getContentString)) + .map(r => (ExtractDomain(r._1).removePrefixWWW(), ExtractDomain(r._2).removePrefixWWW())) + .filter(r => r._1 != "" && r._2 != "") + val graph = ExtractGraphX.extractGraphX(examplerdd) + .subgraph(epred = eTriplet => eTriplet.attr.edgeCount > 5) + val pRank = ExtractGraphX.runPageRankAlgorithm(graph).vertices.take(3) + assert(pRank(0)._2.pageRank == 0.9943090942904987) + assert(pRank(0)._2.weak == -649648005) + assert(pRank(0)._2.strong == -649648005) + } + + test("creates a network using dynamic PR") { + val examplerdd = RecordLoader.loadArchives(arcPath, sc) + .keepValidPages() + .keepContent(Set("apple".r)) + .flatMap(r => ExtractLinks(r.getUrl, r.getContentString)) + .map(r => (ExtractDomain(r._1).removePrefixWWW(), ExtractDomain(r._2).removePrefixWWW())) + .filter(r => r._1 != "" && r._2 != "") + ExtractGraphX.dynamic = true + val graph = ExtractGraphX.extractGraphX(examplerdd) + .subgraph(epred = eTriplet => eTriplet.attr.edgeCount > 5) + val pRank = ExtractGraphX.runPageRankAlgorithm(graph).vertices.take(3) + + assert(pRank(0)._2.pageRank == 0.9999999999999986) + assert(pRank(0)._2.weak == -1054421350) + assert(pRank(0)._2.strong == -1054421350) + } + + after { + if (sc != null) { + sc.stop() + } + if (Files.exists(Paths.get(testVertexFile))) { + Try (FileUtils.deleteDirectory(new File(testVertexFile))) + } + if (Files.exists(Paths.get(testEdgesFile))) { + Try(FileUtils.deleteDirectory(new File(testEdgesFile))); + } + } + } diff --git a/src/test/scala/io/archivesunleashed/app/PlainTextExtractorTest.scala b/src/test/scala/io/archivesunleashed/app/PlainTextExtractorTest.scala index b7e437ea..7666faee 100644 --- a/src/test/scala/io/archivesunleashed/app/PlainTextExtractorTest.scala +++ b/src/test/scala/io/archivesunleashed/app/PlainTextExtractorTest.scala @@ -44,14 +44,15 @@ class PlainTextExtractorTest extends FunSuite with BeforeAndAfter { val df = RecordLoader.loadArchives(arcPath, sc).extractValidPagesDF() val rddResults = PlainTextExtractor(rdd).collect() val dfResults = PlainTextExtractor(df).collect() + val RESULTSLENGTH = 135 - assert(rddResults.length == 135) + assert(rddResults.length == RESULTSLENGTH) assert(rddResults(0)._1 == "20080430") assert(rddResults(0)._2 == "www.archive.org") assert(rddResults(0)._3 == "http://www.archive.org/") assert(rddResults(0)._4 == "HTTP/1.1 200 OK Date: Wed, 30 Apr 2008 20:48:25 GMT Server: Apache/2.0.54 (Ubuntu) PHP/5.0.5-2ubuntu1.4 mod_ssl/2.0.54 OpenSSL/0.9.7g Last-Modified: Wed, 09 Jan 2008 23:18:29 GMT ETag: \"47ac-16e-4f9e5b40\" Accept-Ranges: bytes Content-Length: 366 Connection: close Content-Type: text/html; charset=UTF-8 Please visit our website at: http://www.archive.org") - assert(dfResults.length == 135) + assert(dfResults.length == RESULTSLENGTH) assert(dfResults(0).get(0) == "20080430") assert(dfResults(0).get(1) == "www.archive.org") assert(dfResults(0).get(2) == "http://www.archive.org/") diff --git a/src/test/scala/io/archivesunleashed/app/WriteGraphMLTest.scala b/src/test/scala/io/archivesunleashed/app/WriteGraphMLTest.scala index b2471e2d..1962d984 100644 --- a/src/test/scala/io/archivesunleashed/app/WriteGraphMLTest.scala +++ b/src/test/scala/io/archivesunleashed/app/WriteGraphMLTest.scala @@ -58,7 +58,6 @@ class WriteGraphMLTest extends FunSuite with BeforeAndAfter{ test ("returns a Bool depending on pass or failure") { val networkrdd = sc.parallelize(network) val graphml = WriteGraphML(networkrdd, testFile) - println(graphml) assert(graphml == true) assert(WriteGraphML(networkrdd, "") == false) } diff --git a/src/test/scala/io/archivesunleashed/app/WriteGraphXMLTest.scala b/src/test/scala/io/archivesunleashed/app/WriteGraphXMLTest.scala new file mode 100644 index 00000000..5c763784 --- /dev/null +++ b/src/test/scala/io/archivesunleashed/app/WriteGraphXMLTest.scala @@ -0,0 +1,74 @@ +/* + * Archives Unleashed Toolkit (AUT): + * An open-source platform for analyzing web archives. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.archivesunleashed.app + +import java.io.File +import java.nio.file.{Files, Paths} + +import org.apache.spark.{SparkConf, SparkContext} +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import org.scalatest.{BeforeAndAfter, FunSuite} +import org.apache.spark.graphx._ + +import scala.io.Source + +@RunWith(classOf[JUnitRunner]) +class WriteGraphXMLTest extends FunSuite with BeforeAndAfter{ + private var sc: SparkContext = _ + private val master = "local[4]" + private val appName = "example-spark" + private val network = Seq(("Source1", "Destination1"), + ("Source2", "Destination2"), + ("Source3", "Destination3")) + private val testFile = "temporaryTestFile.txt" + + before { + val conf = new SparkConf() + .setMaster(master) + .setAppName(appName) + conf.set("spark.driver.allowMultipleContexts", "true"); + sc = new SparkContext(conf) + } + + test("creates the file") { + val networkrdd = ExtractGraphX.extractGraphX(sc.parallelize(network)) + val pRank = ExtractGraphX.runPageRankAlgorithm(networkrdd) + WriteGraphXML(pRank, testFile) + assert(Files.exists(Paths.get(testFile)) == true) + val lines = Source.fromFile(testFile).getLines.toList + assert(lines(0) == """<?xml version="1.0" encoding="UTF-8"?>""") + assert(lines(13) == """<nodes>""") + } + + test ("returns a Bool depending on pass or failure") { + val networkrdd = ExtractGraphX.extractGraphX(sc.parallelize(network)) + val pRank = ExtractGraphX.runPageRankAlgorithm(networkrdd) + val graphml = WriteGraphXML(pRank, testFile) + assert(graphml == true) + assert(WriteGraphXML(pRank, "") == false) + } + + after { + if (sc != null) { + sc.stop() + } + if (Files.exists(Paths.get(testFile))) { + new File(testFile).delete() + } + } +}