Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Qualification tool: Error handling while processing large event logs #3714

Merged
merged 9 commits into from
Oct 8, 2021
45 changes: 27 additions & 18 deletions tools/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,25 +84,34 @@ abstract class AppBase(
val logFiles = reader.listEventLogFiles
logFiles.foreach { file =>
Utils.tryWithResource(openEventLogInternal(file.getPath, fs)) { in =>
val lines = Source.fromInputStream(in)(Codec.UTF8).getLines().toList
// Using find as foreach with conditional to exit early if we are done.
// Do NOT use a while loop as it is much much slower.
lines.find { line =>
val isDone = try {
totalNumEvents += 1
val event = JsonProtocol.sparkEventFromJson(parse(line))
processEvent(event)
try {
val lines = Source.fromInputStream(in)(Codec.UTF8).getLines().toList
Copy link
Collaborator

@gerashegalov gerashegalov Sep 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the root cause of OOM is likely the fact that we materialize the whole file on Heap. Remove toList to keep it a line iterator.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @gerashegalov for taking a look. I was still seeing OOM error even after removing toList as we are reading one event log per thread. I am wrapping the checks at thread level now.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are multiple things that can cause OOM, Gera is just saying this is potentially one of those, it depends on the file sizes. we should file a separate followup if we want to optimize it more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you post the stack trace, it might give us a hint what to look for?

And I'd get a heapdump with -XX:+HeapDumpOnOutOfMemoryError

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Below is the stack trace. It looks like while reading the file from inputStream.

Dumping heap to java_pid31797.hprof ...
Heap dump file created [427465673 bytes in 1.749 secs]
21/09/30 15:15:32 ERROR Profiler: OOM error while processing large file file:/home/nartal/CPU_runs/application_1630450374626_0001_1.Increase heap size.
java.lang.OutOfMemoryError: GC overhead limit exceeded
	at java.util.Arrays.copyOf(Arrays.java:3332)
	at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
	at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:596)
	at java.lang.StringBuilder.append(StringBuilder.java:190)
	at java.io.BufferedReader.readLine(BufferedReader.java:358)
	at java.io.BufferedReader.readLine(BufferedReader.java:389)
	at scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.scala:74)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:189)
	at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:47)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:313)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:311)
	at scala.collection.AbstractIterator.to(Iterator.scala:1429)
	at scala.collection.TraversableOnce.toList(TraversableOnce.scala:297)
	at scala.collection.TraversableOnce.toList$(TraversableOnce.scala:297)
	at scala.collection.AbstractIterator.toList(Iterator.scala:1429)
	at org.apache.spark.sql.rapids.tool.AppBase.$anonfun$processEvents$4(AppBase.scala:87)
	at org.apache.spark.sql.rapids.tool.AppBase$$Lambda$203/903990770.apply(Unknown Source)
	at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2611)
	at org.apache.spark.sql.rapids.tool.AppBase.$anonfun$processEvents$2(AppBase.scala:86)
	at org.apache.spark.sql.rapids.tool.AppBase$$Lambda$199/136377256.apply(Unknown Source)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.rapids.tool.AppBase.processEvents(AppBase.scala:85)
	at org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo.<init>(ApplicationInfo.scala:240)
	at com.nvidia.spark.rapids.tool.profiling.Profiler.com$nvidia$spark$rapids$tool$profiling$Profiler$$createApp(Profiler.scala:248)
	at com.nvidia.spark.rapids.tool.profiling.Profiler$ProfileProcessThread$1.run(Profiler.scala:205)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)

I got the heapdump by including the argument you had specified. I opened it with jhat. It's too much info in there. Could you please let me pointers about where I should be looking at?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like you still have toList in there.

VisualVM (part of JDK) and Eclipse MAT are more user friendly for analyzing the heap dump. You want to look for objects with large "retained" heap size.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Gera. Filed follow on issue to improve memory consumption: https://github.com/NVIDIA/spark-rapids/issues/3727
This PR is mostly to identify OOM and throw a meaningful error so that users can increase the heap size.

// Using find as foreach with conditional to exit early if we are done.
// Do NOT use a while loop as it is much much slower.
lines.find { line =>
val isDone = try {
totalNumEvents += 1
val event = JsonProtocol.sparkEventFromJson(parse(line))
processEvent(event)
}
catch {
case e: ClassNotFoundException =>
// swallow any messages about this class since likely using spark version
// before 3.1
if (!e.getMessage.contains("SparkListenerResourceProfileAdded")) {
logWarning(s"ClassNotFoundException: ${e.getMessage}")
}
false
}
isDone
}
catch {
case e: ClassNotFoundException =>
// swallow any messages about this class since likely using spark version
// before 3.1
if (!e.getMessage.contains("SparkListenerResourceProfileAdded")) {
logWarning(s"ClassNotFoundException: ${e.getMessage}")
}
false
}
isDone
} catch {
case o: OutOfMemoryError =>
logError(s"OOM error while processing large file ${eventlog.toString}. " +
s"Increase heap size.", o)
throw o
case e: Exception =>
logError(s"Unexpected exception processing log ${eventlog.toString}, skipping!", e)
}
}
}
Expand Down