-
Notifications
You must be signed in to change notification settings - Fork 33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor loadArchives() function #257
Conversation
…es, format and maxSize
Great, thanks @borislin. Could you provide your script here with how you used the refactored function? I can kick the tires on that soon. |
* @param fs filesystem | ||
* @param prefix prefix of archive files | ||
* @param numFiles number of archive files | ||
* @param maxSize maximu size of archive files |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maximu
typo
* @param dir the path to the directory containing archive files | ||
* @param fs filesystem | ||
* @param prefix prefix of archive files | ||
* @param numFiles number of archive files |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain how numFiles
and maxSize
interact? Both shouldn't be set, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
numFiles
is used for debugging only. For instance we can do a sanity check by running on only 10 files by setting numFiles = 10
.
We can also limit the maximum file size since I found out that large files are causing the heartbeat timeout issue when I run AUT on those ARC files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We ran into that issue a few weeks ago with auk in production. We run every job with the heartbeat flag now: https://github.com/archivesunleashed/auk/blob/master/app/jobs/collections_spark_job.rb#L51
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ruebot but how do you determine the correct value for spark_heartbeat_interval
? by trial-and-error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW I determined by trial-and-error. Since it only affects a small number of (W)ARCs, putting the number relatively high was fine in our particular application. We have it set to 600s
by default right now.
var files = indexFiles.filter(f => { | ||
val path = f.getPath.getName | ||
val fileSize = fs.getContentSummary(f.getPath).getLength | ||
f.isFile && (prefix.isEmpty || path.startsWith(prefix.get)) && path.endsWith(".gz") && (maxSize.isEmpty || fileSize <= maxSize.get * 1000000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line too long... add wrapping?
Also, maxSize.get * 1000000
is a bit janky - do Int.MaxValue? https://www.scala-lang.org/api/2.12.0/scala/Int$.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I mean here is to convert MB to bytes. I've added a helper function in the new commit.
sc.newAPIHadoopFile(path, classOf[ArchiveRecordInputFormat], classOf[LongWritable], classOf[ArchiveRecordWritable]) | ||
.filter(r => (r._2.getFormat == ArchiveFormat.ARC) || | ||
((r._2.getFormat == ArchiveFormat.WARC) && r._2.getRecord.getHeader.getHeaderValue("WARC-Type").equals("response"))) | ||
def loadArchives(path: String, sc: SparkContext, format: Option[String] = None, prefix: Option[String] = None, numFiles: Option[Int] = None, maxSize : Option[Long] = None): RDD[ArchiveRecord] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wrap line?
import io.archivesunleashed.matchbox.ImageDetails | ||
import io.archivesunleashed.matchbox.ExtractDate.DateComponent | ||
// scalastyle:off underscore.import | ||
import io.archivesunleashed.matchbox.ExtractDate.DateComponent._ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove blank line.
|
||
val log: Logger = Logger.getLogger(getClass.getName) | ||
|
||
/** Gets all archive files by applying filters prefix, numFiles and maxSize |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Full stop at the end.
import org.apache.spark.{SerializableWritable, SparkContext} | ||
import org.apache.spark.rdd.RDD | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove blank line.
Codecov Report
@@ Coverage Diff @@
## master #257 +/- ##
==========================================
+ Coverage 70.35% 70.41% +0.05%
==========================================
Files 41 41
Lines 1039 1058 +19
Branches 191 193 +2
==========================================
+ Hits 731 745 +14
- Misses 242 244 +2
- Partials 66 69 +3
Continue to review full report at Codecov.
|
val log: Logger = Logger.getLogger(getClass.getName) | ||
|
||
/** Convert MB to Bytes. **/ | ||
def mbToBytes(size: Long): Long = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't look like this is covered in tests. https://codecov.io/gh/archivesunleashed/aut/pull/257/diff?src=pr&el=tree#D1-53
val indexFiles = fs.listStatus(dir) | ||
var files = indexFiles.filter(f => isValidFile(f, fs, prefix, maxSize)).map(f => f.getPath) | ||
if (numFiles.isDefined) { | ||
files = files.take(numFiles.get) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs to be tested. https://codecov.io/gh/archivesunleashed/aut/pull/257/diff?src=pr&el=tree#D1-76
hi @borislin - given your latest analyses, I also think we need a max size variable? |
Just following up on this PR - any movement on responding to the reviews and getting this moved forward? |
@ianmilligan1 @lintool is this PR superseded by #275? It's unclear which one is which for #247 -- this one or #275 -- or if it should be both. |
Don't delete this branch. |
Patch for #247.
What does this Pull Request do?
This PR refactors
loadArchives()
function to accept optional parametersnumFiles
,maxSize
,format
andprefix
, allowing us to have some fine grained control over archive files loading. This will greatly help us debug large collections and won't affect all current code that callloadArchives()
.How should this be tested?
A description of what steps someone could take to:
loadArchives()
functionloadArchives(path, sc)
loadArchives(path, sc, args.format.toOption, args.prefix.toOption, args.files.toOption, args.size.toOption)
loadArchives(path, sc, prefix = args.prefix.toOption, numFiles = args.files.toOption, maxSize = args.size.toOption)
Interested parties
@lintool @ianmilligan1