diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala index 9b3741bbb91..437093b997e 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala @@ -20,6 +20,7 @@ import java.util.concurrent.{ConcurrentLinkedQueue, Executors, ThreadPoolExecuto import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, HashMap} +import scala.util.control.NonFatal import com.google.common.util.concurrent.ThreadFactoryBuilder import com.nvidia.spark.rapids.tool.{EventLogInfo, EventLogPathProcessor} @@ -106,14 +107,32 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs) extends Logging } } + private def errorHandler(error: Throwable, path: EventLogInfo) = { + error match { + case oom: OutOfMemoryError => + logError(s"OOM error while processing large file: ${path.eventLog.toString}." + + s" Increase heap size. Exiting ...", oom) + sys.exit(1) + case NonFatal(e) => + logWarning(s"Exception occurred processing file: ${path.eventLog.getName}", e) + case o: Throwable => + logError(s"Error occurred while processing file: ${path.eventLog.toString}. Exiting ...", o) + sys.exit(1) + } + } + private def createApps(allPaths: Seq[EventLogInfo]): Seq[ApplicationInfo] = { var errorCodes = ArrayBuffer[Int]() val allApps = new ConcurrentLinkedQueue[ApplicationInfo]() class ProfileThread(path: EventLogInfo, index: Int) extends Runnable { def run: Unit = { - val appOpt = createApp(path, numOutputRows, index, hadoopConf) - appOpt.foreach(app => allApps.add(app)) + try { + val appOpt = createApp(path, numOutputRows, index, hadoopConf) + appOpt.foreach(app => allApps.add(app)) + } catch { + case t: Throwable => errorHandler(t, path) + } } } @@ -145,18 +164,21 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs) extends Logging class ProfileThread(path: EventLogInfo, index: Int) extends Runnable { def run: Unit = { - val appOpt = createApp(path, numOutputRows, index, hadoopConf) - appOpt.foreach { app => - val sum = try { - val (s, _) = processApps(Seq(app), false, profileOutputWriter) - Some(s) - } catch { - case e: Exception => - logWarning(s"Unexpected exception thrown ${path.eventLog.toString}, skipping! ", e) - None - + try { + val appOpt = createApp(path, numOutputRows, index, hadoopConf) + appOpt.foreach { app => + val sum = try { + val (s, _) = processApps(Seq(app), false, profileOutputWriter) + Some(s) + } catch { + case e: Exception => + logWarning(s"Unexpected exception thrown ${path.eventLog.toString}, skipping! ", e) + None + } + sum.foreach(allApps.add(_)) } - sum.foreach(allApps.add(_)) + } catch { + case t: Throwable => errorHandler(t, path) } } } @@ -186,7 +208,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs) extends Logging class ProfileProcessThread(path: EventLogInfo, index: Int) extends Runnable { def run: Unit = { try { - // we just skip apps that don't process cleanly + // we just skip apps that don't process cleanly and exit if heap is smaller val appOpt = createApp(path, numOutputRows, index, hadoopConf) appOpt match { case Some(app) => @@ -204,8 +226,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs) extends Logging logInfo("No application to process. Exiting") } } catch { - case e: Exception => - logWarning(s"Exception occurred processing file: ${path.eventLog.getName}", e) + case t: Throwable => errorHandler(t, path) } } } diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala index 79dfdc84e9f..b8f1bc4dea9 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala @@ -109,8 +109,15 @@ class Qualification(outputDir: String, numRows: Int, hadoopConf: Configuration, } } } catch { + case oom: OutOfMemoryError => + logError(s"OOM error while processing large file: ${path.eventLog.toString}." + + s"Increase heap size.", oom) + System.exit(1) + case o: Error => + logError(s"Error occured while processing file: ${path.eventLog.toString}", o) + System.exit(1) case e: Exception => - logError(s"Unexpected exception processing log ${path.eventLog.toString}, skipping!", e) + logWarning(s"Unexpected exception processing log ${path.eventLog.toString}, skipping!", e) } } }