Skip to content

Commit

Permalink
Add "Extract popular images" DataFrame implementation (#382).
Browse files Browse the repository at this point in the history
- Add tests for ExtractPopularImagesDF
- Rename ExtractPopularImages to ExtractPopularImagesRDD
- Addresses #223
  • Loading branch information
Gursimran Singh authored and ruebot committed Nov 21, 2019
1 parent c4eaca9 commit 4042180
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright © 2017 The Archives Unleashed Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.archivesunleashed.app

import io.archivesunleashed.ArchiveRecord
import org.apache.spark.{RangePartitioner, SparkContext}
import org.apache.spark.sql.functions.{desc,first}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

/** Extract most popular images from a Data Frame. */
object ExtractPopularImagesDF {
val MIN_WIDTH: Int = 30
val MIN_HEIGHT: Int = 30

/** Extracts the <i>n</i> most popular images from an Data Frame within a given size range.
*
* @param d Data frame obtained from RecordLoader
* @param limit number of most popular images in the output
* @param minWidth of image
* @param minHeight of image
* @return Dataset[Row], where the schema is (url, count)
*/
def apply(d: DataFrame, limit: Int, minWidth: Int = MIN_WIDTH, minHeight: Int = MIN_HEIGHT): Dataset[Row] = {

val spark = SparkSession.builder().master("local").getOrCreate()
// scalastyle:off
import spark.implicits._
// scalastyle:on

val df = d.select($"url",$"md5")
.filter(($"width") >= minWidth && ($"height") >= minHeight)

val count = df.groupBy("md5").count()

df.join(count,"md5")
.groupBy("md5")
.agg(first("url").as("url"), first("count").as("count"))
.select("url","count")
.orderBy(desc("count"))
.limit(limit)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.{RangePartitioner, SparkContext}

/** Extract most popular images from an RDD. */
object ExtractPopularImages {
object ExtractPopularImagesRDD {
val LIMIT_MAXIMUM: Int = 500
val LIMIT_DENOMINATOR: Int = 250
val MIN_WIDTH: Int = 30
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright © 2017 The Archives Unleashed Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.archivesunleashed.app

import com.google.common.io.Resources
import io.archivesunleashed.RecordLoader
import org.apache.spark.{SparkConf, SparkContext}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfter, FunSuite}

@RunWith(classOf[JUnitRunner])
class ExtractPopularImagesDFTest extends FunSuite with BeforeAndAfter {
private val arcPath = Resources.getResource("arc/example.arc.gz").getPath
private var sc: SparkContext = _
private val master = "local[4]"
private val appName = "example-spark"
private val testVertexFile = "temporaryTestVertexDir"
private val testEdgesFile = "temporaryTestEdgesDir"

before {
val conf = new SparkConf()
.setMaster(master)
.setAppName(appName)
conf.set("spark.driver.allowMultipleContexts", "true");
sc = new SparkContext(conf)
}

test("extracts popular images") {
val highTest = 507
val exampledf = RecordLoader.loadArchives(arcPath, sc).images()
val imagesLowLimit = ExtractPopularImagesDF(exampledf, 3)
val imagesHighLimit = ExtractPopularImagesDF(exampledf, highTest)
val response = "1"
assert (imagesLowLimit.take(1)(0)(1).toString == response)
assert (imagesHighLimit.take(1)(0)(1).toString == response)
}
after {
if (sc != null) {
sc.stop()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfter, FunSuite}

@RunWith(classOf[JUnitRunner])
class ExtractPopularImagesTest extends FunSuite with BeforeAndAfter {
class ExtractPopularImagesRDDTest extends FunSuite with BeforeAndAfter {
private val arcPath = Resources.getResource("arc/example.arc.gz").getPath
private var sc: SparkContext = _
private val master = "local[4]"
Expand All @@ -42,8 +42,8 @@ class ExtractPopularImagesTest extends FunSuite with BeforeAndAfter {
test("extracts popular images") {
val highTest = 507
val examplerdd = RecordLoader.loadArchives(arcPath, sc)
val imagesLowLimit = ExtractPopularImages(examplerdd, 3, sc)
val imagesHighLimit = ExtractPopularImages(examplerdd, highTest, sc)
val imagesLowLimit = ExtractPopularImagesRDD(examplerdd, 3, sc)
val imagesHighLimit = ExtractPopularImagesRDD(examplerdd, highTest, sc)
val response = Array("1\thttp://www.archive.org/images/books-small.jpg",
"1\thttp://i.creativecommons.org/l/by-sa/3.0/88x31.png",
"1\thttp://www.archive.org/images/blendbar.jpg")
Expand Down

0 comments on commit 4042180

Please sign in to comment.