Skip to content

Commit

Permalink
Qualification tool: Error handling while processing large event logs (#…
Browse files Browse the repository at this point in the history
…3714)

This fixes #3430 .
In this PR, we catch OOM Error and log it with a hint to increase heap size to the user. Earlier it would complete without any log.
Not sure how to add tests for this. I ran it locally with small heap size and got the logError on the console.

```
 java -Xmx512M -cp tools/target/rapids-4-spark-tools_2.12-21.12.0-SNAPSHOT.jar:/home/nartal/spark-3.1.1/spark-3.1.1-bin-hadoop3.2/jars/* com.nvidia.spark.rapids.tool.profiling.ProfileMain --csv /home/nartal/CPU_GPU_eventLogs/CPU_runs/


21/09/29 17:44:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/09/29 17:44:56 ERROR ApplicationInfo: OOM error while processing large files. Increase heap size to process file:/home/nartal/CPU_GPU_eventLogs/CPU_runs/application_1630450374626_0001_1
```
  • Loading branch information
nartal1 authored Oct 8, 2021
1 parent 6f46115 commit 23a8a99
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import java.util.concurrent.{ConcurrentLinkedQueue, Executors, ThreadPoolExecuto

import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.util.control.NonFatal

import com.google.common.util.concurrent.ThreadFactoryBuilder
import com.nvidia.spark.rapids.tool.{EventLogInfo, EventLogPathProcessor}
Expand Down Expand Up @@ -106,14 +107,32 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs) extends Logging
}
}

private def errorHandler(error: Throwable, path: EventLogInfo) = {
error match {
case oom: OutOfMemoryError =>
logError(s"OOM error while processing large file: ${path.eventLog.toString}." +
s" Increase heap size. Exiting ...", oom)
sys.exit(1)
case NonFatal(e) =>
logWarning(s"Exception occurred processing file: ${path.eventLog.getName}", e)
case o: Throwable =>
logError(s"Error occurred while processing file: ${path.eventLog.toString}. Exiting ...", o)
sys.exit(1)
}
}

private def createApps(allPaths: Seq[EventLogInfo]): Seq[ApplicationInfo] = {
var errorCodes = ArrayBuffer[Int]()
val allApps = new ConcurrentLinkedQueue[ApplicationInfo]()

class ProfileThread(path: EventLogInfo, index: Int) extends Runnable {
def run: Unit = {
val appOpt = createApp(path, numOutputRows, index, hadoopConf)
appOpt.foreach(app => allApps.add(app))
try {
val appOpt = createApp(path, numOutputRows, index, hadoopConf)
appOpt.foreach(app => allApps.add(app))
} catch {
case t: Throwable => errorHandler(t, path)
}
}
}

Expand Down Expand Up @@ -145,18 +164,21 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs) extends Logging

class ProfileThread(path: EventLogInfo, index: Int) extends Runnable {
def run: Unit = {
val appOpt = createApp(path, numOutputRows, index, hadoopConf)
appOpt.foreach { app =>
val sum = try {
val (s, _) = processApps(Seq(app), false, profileOutputWriter)
Some(s)
} catch {
case e: Exception =>
logWarning(s"Unexpected exception thrown ${path.eventLog.toString}, skipping! ", e)
None

try {
val appOpt = createApp(path, numOutputRows, index, hadoopConf)
appOpt.foreach { app =>
val sum = try {
val (s, _) = processApps(Seq(app), false, profileOutputWriter)
Some(s)
} catch {
case e: Exception =>
logWarning(s"Unexpected exception thrown ${path.eventLog.toString}, skipping! ", e)
None
}
sum.foreach(allApps.add(_))
}
sum.foreach(allApps.add(_))
} catch {
case t: Throwable => errorHandler(t, path)
}
}
}
Expand Down Expand Up @@ -186,7 +208,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs) extends Logging
class ProfileProcessThread(path: EventLogInfo, index: Int) extends Runnable {
def run: Unit = {
try {
// we just skip apps that don't process cleanly
// we just skip apps that don't process cleanly and exit if heap is smaller
val appOpt = createApp(path, numOutputRows, index, hadoopConf)
appOpt match {
case Some(app) =>
Expand All @@ -204,8 +226,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs) extends Logging
logInfo("No application to process. Exiting")
}
} catch {
case e: Exception =>
logWarning(s"Exception occurred processing file: ${path.eventLog.getName}", e)
case t: Throwable => errorHandler(t, path)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,15 @@ class Qualification(outputDir: String, numRows: Int, hadoopConf: Configuration,
}
}
} catch {
case oom: OutOfMemoryError =>
logError(s"OOM error while processing large file: ${path.eventLog.toString}." +
s"Increase heap size.", oom)
System.exit(1)
case o: Error =>
logError(s"Error occured while processing file: ${path.eventLog.toString}", o)
System.exit(1)
case e: Exception =>
logError(s"Unexpected exception processing log ${path.eventLog.toString}, skipping!", e)
logWarning(s"Unexpected exception processing log ${path.eventLog.toString}, skipping!", e)
}
}
}

0 comments on commit 23a8a99

Please sign in to comment.