From 40421806b563eb4466f3e44a43e3c602739ce896 Mon Sep 17 00:00:00 2001 From: Gursimran Singh Date: Thu, 21 Nov 2019 16:24:35 -0500 Subject: [PATCH] Add "Extract popular images" DataFrame implementation (#382). - Add tests for ExtractPopularImagesDF - Rename ExtractPopularImages to ExtractPopularImagesRDD - Addresses #223 --- .../app/ExtractPopularImagesDF.scala | 55 ++++++++++++++++++ ...es.scala => ExtractPopularImagesRDD.scala} | 2 +- .../app/ExtractPopularImagesDFTest.scala | 56 +++++++++++++++++++ ...cala => ExtractPopularImagesRDDTest.scala} | 6 +- 4 files changed, 115 insertions(+), 4 deletions(-) create mode 100644 src/main/scala/io/archivesunleashed/app/ExtractPopularImagesDF.scala rename src/main/scala/io/archivesunleashed/app/{ExtractPopularImages.scala => ExtractPopularImagesRDD.scala} (98%) create mode 100644 src/test/scala/io/archivesunleashed/app/ExtractPopularImagesDFTest.scala rename src/test/scala/io/archivesunleashed/app/{ExtractPopularImagesTest.scala => ExtractPopularImagesRDDTest.scala} (89%) diff --git a/src/main/scala/io/archivesunleashed/app/ExtractPopularImagesDF.scala b/src/main/scala/io/archivesunleashed/app/ExtractPopularImagesDF.scala new file mode 100644 index 00000000..585ef1f9 --- /dev/null +++ b/src/main/scala/io/archivesunleashed/app/ExtractPopularImagesDF.scala @@ -0,0 +1,55 @@ +/* + * Copyright © 2017 The Archives Unleashed Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.archivesunleashed.app + +import io.archivesunleashed.ArchiveRecord +import org.apache.spark.{RangePartitioner, SparkContext} +import org.apache.spark.sql.functions.{desc,first} +import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} + +/** Extract most popular images from a Data Frame. */ +object ExtractPopularImagesDF { + val MIN_WIDTH: Int = 30 + val MIN_HEIGHT: Int = 30 + + /** Extracts the n most popular images from an Data Frame within a given size range. + * + * @param d Data frame obtained from RecordLoader + * @param limit number of most popular images in the output + * @param minWidth of image + * @param minHeight of image + * @return Dataset[Row], where the schema is (url, count) + */ + def apply(d: DataFrame, limit: Int, minWidth: Int = MIN_WIDTH, minHeight: Int = MIN_HEIGHT): Dataset[Row] = { + + val spark = SparkSession.builder().master("local").getOrCreate() + // scalastyle:off + import spark.implicits._ + // scalastyle:on + + val df = d.select($"url",$"md5") + .filter(($"width") >= minWidth && ($"height") >= minHeight) + + val count = df.groupBy("md5").count() + + df.join(count,"md5") + .groupBy("md5") + .agg(first("url").as("url"), first("count").as("count")) + .select("url","count") + .orderBy(desc("count")) + .limit(limit) + } +} diff --git a/src/main/scala/io/archivesunleashed/app/ExtractPopularImages.scala b/src/main/scala/io/archivesunleashed/app/ExtractPopularImagesRDD.scala similarity index 98% rename from src/main/scala/io/archivesunleashed/app/ExtractPopularImages.scala rename to src/main/scala/io/archivesunleashed/app/ExtractPopularImagesRDD.scala index c3d670ee..1b172b52 100644 --- a/src/main/scala/io/archivesunleashed/app/ExtractPopularImages.scala +++ b/src/main/scala/io/archivesunleashed/app/ExtractPopularImagesRDD.scala @@ -21,7 +21,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.{RangePartitioner, SparkContext} /** Extract most popular images from an RDD. */ -object ExtractPopularImages { +object ExtractPopularImagesRDD { val LIMIT_MAXIMUM: Int = 500 val LIMIT_DENOMINATOR: Int = 250 val MIN_WIDTH: Int = 30 diff --git a/src/test/scala/io/archivesunleashed/app/ExtractPopularImagesDFTest.scala b/src/test/scala/io/archivesunleashed/app/ExtractPopularImagesDFTest.scala new file mode 100644 index 00000000..932798b4 --- /dev/null +++ b/src/test/scala/io/archivesunleashed/app/ExtractPopularImagesDFTest.scala @@ -0,0 +1,56 @@ +/* + * Copyright © 2017 The Archives Unleashed Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.archivesunleashed.app + +import com.google.common.io.Resources +import io.archivesunleashed.RecordLoader +import org.apache.spark.{SparkConf, SparkContext} +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import org.scalatest.{BeforeAndAfter, FunSuite} + +@RunWith(classOf[JUnitRunner]) +class ExtractPopularImagesDFTest extends FunSuite with BeforeAndAfter { + private val arcPath = Resources.getResource("arc/example.arc.gz").getPath + private var sc: SparkContext = _ + private val master = "local[4]" + private val appName = "example-spark" + private val testVertexFile = "temporaryTestVertexDir" + private val testEdgesFile = "temporaryTestEdgesDir" + + before { + val conf = new SparkConf() + .setMaster(master) + .setAppName(appName) + conf.set("spark.driver.allowMultipleContexts", "true"); + sc = new SparkContext(conf) + } + + test("extracts popular images") { + val highTest = 507 + val exampledf = RecordLoader.loadArchives(arcPath, sc).images() + val imagesLowLimit = ExtractPopularImagesDF(exampledf, 3) + val imagesHighLimit = ExtractPopularImagesDF(exampledf, highTest) + val response = "1" + assert (imagesLowLimit.take(1)(0)(1).toString == response) + assert (imagesHighLimit.take(1)(0)(1).toString == response) + } + after { + if (sc != null) { + sc.stop() + } + } +} \ No newline at end of file diff --git a/src/test/scala/io/archivesunleashed/app/ExtractPopularImagesTest.scala b/src/test/scala/io/archivesunleashed/app/ExtractPopularImagesRDDTest.scala similarity index 89% rename from src/test/scala/io/archivesunleashed/app/ExtractPopularImagesTest.scala rename to src/test/scala/io/archivesunleashed/app/ExtractPopularImagesRDDTest.scala index 7d816f74..38ffd509 100644 --- a/src/test/scala/io/archivesunleashed/app/ExtractPopularImagesTest.scala +++ b/src/test/scala/io/archivesunleashed/app/ExtractPopularImagesRDDTest.scala @@ -23,7 +23,7 @@ import org.scalatest.junit.JUnitRunner import org.scalatest.{BeforeAndAfter, FunSuite} @RunWith(classOf[JUnitRunner]) -class ExtractPopularImagesTest extends FunSuite with BeforeAndAfter { +class ExtractPopularImagesRDDTest extends FunSuite with BeforeAndAfter { private val arcPath = Resources.getResource("arc/example.arc.gz").getPath private var sc: SparkContext = _ private val master = "local[4]" @@ -42,8 +42,8 @@ class ExtractPopularImagesTest extends FunSuite with BeforeAndAfter { test("extracts popular images") { val highTest = 507 val examplerdd = RecordLoader.loadArchives(arcPath, sc) - val imagesLowLimit = ExtractPopularImages(examplerdd, 3, sc) - val imagesHighLimit = ExtractPopularImages(examplerdd, highTest, sc) + val imagesLowLimit = ExtractPopularImagesRDD(examplerdd, 3, sc) + val imagesHighLimit = ExtractPopularImagesRDD(examplerdd, highTest, sc) val response = Array("1\thttp://www.archive.org/images/books-small.jpg", "1\thttp://i.creativecommons.org/l/by-sa/3.0/88x31.png", "1\thttp://www.archive.org/images/blendbar.jpg")