Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Save images from dataframe to disk #234

Merged
merged 6 commits into from
May 25, 2018

Conversation

jwli229
Copy link
Contributor

@jwli229 jwli229 commented May 22, 2018

GitHub issue(s):

What does this Pull Request do?

  • given an image UDF as a result of ExtractImageDetailsDF, save the images to disk

How should this be tested?

Start spark shell with spark-shell --jars target/aut-0.16.1-SNAPSHOT-fatjar.jar
Command

import io.archivesunleashed._
import io.archivesunleashed.matchbox._
val df = RecordLoader.loadArchives("example.arc.gz", sc).extractImageDetailsDF();
val res = df.select($"bytes").orderBy(desc("bytes")).limit(3)
SaveImages(res, "bytes", "foo")

Some saved images:
foo0
foo1
foo2

Interested parties

@ianmilligan1 @lintool @ruebot

* @param fileName the name of the file to save the images to (without extension)
* e.g. fileName = "foo" => images are saved as foo0.jpg, foo1.jpg
*/
def apply(df: DataFrame, bytesColumnName: String, fileName: String) = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spacing is off. Are you using tabs or spaces?

@codecov
Copy link

codecov bot commented May 22, 2018

Codecov Report

Merging #234 into master will increase coverage by 0.57%.
The diff coverage is 84.21%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #234      +/-   ##
==========================================
+ Coverage   60.07%   60.65%   +0.57%     
==========================================
  Files          39       39              
  Lines         774      793      +19     
  Branches      137      139       +2     
==========================================
+ Hits          465      481      +16     
- Misses        268      269       +1     
- Partials       41       43       +2
Impacted Files Coverage Δ
...c/main/scala/io/archivesunleashed/df/package.scala 86.36% <84.21%> (-13.64%) ⬇️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 496cd1b...a3f923b. Read the comment docs.

@ruebot
Copy link
Member

ruebot commented May 22, 2018

@JWZ2018 you got a test for this one?

@lintool
Copy link
Member

lintool commented May 22, 2018

How about we refactor so we can chain directly off the DF, e.g.,

val df = RecordLoader.loadArchives("example.arc.gz", sc).extractImageDetailsDF();
val res = df.select($"bytes").orderBy(desc("bytes")).limit(3).saveToDisk("bytes", "foo")

@lintool
Copy link
Member

lintool commented May 22, 2018

@ianmilligan1 want to try running this on a large-ish collection to see if it scales?

@lintool
Copy link
Member

lintool commented May 23, 2018

Per @ruebot please write a test case for this. Extract an image from the resource WARC, write it to disk, read it back and diff.

Also, thoughts on refactoring API to .saveToDisk("bytes", "foo")?

@ianmilligan1
Copy link
Member

For saveToDisk (or currently SaveImages) can we:

  • support paths? I often want the images to dump out to a certain directory - I've specified paths but I don't see the images appearing;

Right now where do the images go? I see a derby.log and metastore_db appearing in the directory I'm running from but no result files?

@jwli229
Copy link
Contributor Author

jwli229 commented May 23, 2018

@lintool @ruebot I'm working on a concurrency issue with the number suffix in the file name. We want to name the files filename<uniqueid>.
Here's what I thought of so far:

  1. using an incrementing counter (the current version in the PR). This counter is shared across different partitions so the files end up overwriting each other.
  2. forcing all the rows through a single partition, but this will have performance issues?
  3. using Spark Sql Window functions, specifically the row_number() function, but this is still within each partition so same as 1.
  4. using both the partition number and the row number within that partition to generate the id, but I didn't find a way to get the partition number from the Window functions.
  5. using a random generated suffix, but then it's hard to know what the files were named in the program after running df.saveToDisk

Any ideas?

@jwli229
Copy link
Contributor Author

jwli229 commented May 23, 2018

@ianmilligan1
The current one should support paths too.
e.g. if you run SaveImages(res, "bytes", "/tmp/foo"), the images should be /tmp/ with paths /tmp/foo0.png, /tmp/foo1.png, etc
The refactoring is coming soon after I resolve the concurrency issue mentioned above.

@greebie
Copy link
Contributor

greebie commented May 23, 2018

Possibly relevant: We've used an MD5 hash in the past to create unique ids from concurrent files (see WriteGraphML). Create a hashed id out of specific characteristics (filesize etc.) and use unique() to remove duplicates?

@ianmilligan1
Copy link
Member

Ok thanks @JWZ2018 – turns out the WARC I was using didn't have images, or something? I'm trying it out on a big collection now. 😄

@ianmilligan1
Copy link
Member

(sorry on the erroneous close – stupid desktop trackpad)

@ianmilligan1
Copy link
Member

Ok I ran the following script on a large collection:

import io.archivesunleashed._
import io.archivesunleashed.matchbox._
val df = RecordLoader.loadArchives("/mnt/vol1/data_sets/cpp/cpp_warcs_accession_01/partner.archive-it.org/cgi-bin/getarcs.pl/*.gz", sc).extractImageDetailsDF();
val res = df.select($"bytes").orderBy(desc("bytes")).limit(10)
SaveImages(res, "bytes", "/mnt/vol1/derivative_data/cpp_image/export/foobar")

It produced six files:

-rw-rw-r-- 1 i2millig 1.1M May 23 12:45 foobar0.png
-rw-rw-r-- 1 i2millig 173K May 23 12:45 foobar1.png
-rw-rw-r-- 1 i2millig 117K May 23 12:45 foobar2.png
-rw-rw-r-- 1 i2millig 703K May 23 12:45 foobar3.png
-rw-rw-r-- 1 i2millig 2.5M May 23 12:45 foobar4.png
-rw-rw-r-- 1 i2millig 1.7K May 23 12:45 foobar5.png

Only question: what does limit(10) do? Why am I only getting six images?

@jwli229
Copy link
Contributor Author

jwli229 commented May 23, 2018

@ianmilligan1
limit(10) limits the results to 10 rows. It can be removed.
Only 6 images were saved likely due to the concurrency issue mentioned above with files being overwritten.
I've updated this PR to fix that issue and also refactor to saveToDisk
You can use

import io.archivesunleashed._
import io.archivesunleashed.matchbox._
val df = RecordLoader.loadArchives("example.arc.gz", sc).extractImageDetailsDF();
val res = df.select($"bytes").orderBy(desc("bytes")).limit(3).saveToDisk("bytes", "foo")

Again the limit(3) can be removed but you may get a lot of images on disk.

@jwli229
Copy link
Contributor Author

jwli229 commented May 23, 2018

@lintool the test is coming soon. When I write the image to disk and read it back the bytes are not the same. I'm looking into why that is.

@jwli229
Copy link
Contributor Author

jwli229 commented May 23, 2018

Test added

@jwli229
Copy link
Contributor Author

jwli229 commented May 25, 2018

@lintool @ruebot @ianmilligan1
can you review this again?

@ianmilligan1
Copy link
Member

Am testing a large export right now 👍

@ianmilligan1
Copy link
Member

Running

import io.archivesunleashed._
import io.archivesunleashed.matchbox._
val df = RecordLoader.loadArchives("/mnt/vol1/data_sets/cpp/cpp_warcs_accession_01/partner.archive-it.org/cgi-bin/getarcs.pl/ARCHIVEIT-227-QUARTERLY-XUGECV-20091219231219-00049-crawling06.us.archive.org-8091.warc.gz", sc).extractImageDetailsDF();
val res = df.select($"bytes").orderBy(desc("bytes")).saveToDisk("bytes", "/mnt/vol1/derivative_data/cpp_image/big_export/cpp_image_")

as per suggested syntax above leads to this error for me:

<console>:27: error: value saveToDisk is not a member of org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
       val res = df.select($"bytes").orderBy(desc("bytes")).saveToDisk("bytes", "/mnt/vol1/derivative_data/cpp_image/big_export/cpp_image_")

@jwli229
Copy link
Contributor Author

jwli229 commented May 25, 2018

@ianmilligan1
Did you repull this branch and rebuild?
I tried running the same code and it finished but it seems like there's no images in that particular set.
I tried this with a different set:

import io.archivesunleashed._
import io.archivesunleashed.matchbox._
val df = RecordLoader.loadArchives("/mnt/vol1/data_sets/cpp/cpp_warcs_accession_01/partner.archive-it.org/cgi-bin/getarcs.pl/ARCHIVEIT-227-20090502145916-00000-crawling01.us.archive.org.warc.gz", sc).extractImageDetailsDF();
val res = df.select($"bytes").orderBy(desc("bytes")).saveToDisk("bytes", "/mnt/vol1/derivative_data/cpp_image/big_export/cpp_image_")

I got a permission denied error on writing to that directory but I saw the images were generated.

@ianmilligan1
Copy link
Member

Thanks @JWZ2018 - working now! And am exploring the images, very interesting and useful stuff. Gives a good birds-eye view of what's in a given WARC.

i.e.
screen shot 2018-05-25 at 3 04 47 pm

Copy link
Member

@ianmilligan1 ianmilligan1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From a usability perspective, works very well!

* Given a dataframe, serializes the images and saves to disk
* @param df the input dataframe
*/
implicit class SaveImage(df: DataFrame) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To reduce confusion I think this should be in df not matchbox.

@lintool
Copy link
Member

lintool commented May 25, 2018

I have a comment, but then I think we can merge?
@ruebot ?

@ruebot
Copy link
Member

ruebot commented May 25, 2018

@lintool works for me.

@jwli229
Copy link
Contributor Author

jwli229 commented May 25, 2018

@lintool moved to df

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants