-
Notifications
You must be signed in to change notification settings - Fork 39
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve implementation of finding median in StatisticsMetrics #1474
Improve implementation of finding median in StatisticsMetrics #1474
Conversation
Signed-off-by: Ahmed Hussein (amahussein) <a@ahussein.me> Fixes NVIDIA#1461 Adds an InPlace median finding to improve the performance of the metric aggregates. We used to sort a sequence to create StatisticsMetrics which turned out to be very expensive in large eventlogs. Signed-off-by: Ahmed Hussein (amahussein) <a@ahussein.me>
Signed-off-by: Ahmed Hussein (amahussein) <a@ahussein.me>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @amahussein for these improvements. Minor question.
} | ||
|
||
override def toString = { | ||
arr mkString ("ArraySize(", ", ", ")") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: Should this be bounded by until - from
?
* @return the median of the array. | ||
*/ | ||
def findMedianInPlace( | ||
arr: Array[Long])(implicit choosePivot: InPlaceMedianArrView => Long): Long = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an interesting usage of implicit
mechanism.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @amahussein for this refactor!
Fixes #1461
Adds an InPlace median finding to improve the performance of the metric aggregates.
We used to sort a sequence to create StatisticsMetrics which turned out to be very expensive in large eventlogs.
It was found that there is a bottleneck in
generateStageLevelAccums
andgenerateSQLAccums
due to the cost of sorting and using sequences of Long on metrics aggregates.Impact on performance:
The PR improves the Qualification runtime. methods like
generateSQLAccums
is down from 21,550 to 3,480 ms (~80% improvement)generateStageLevelAccums
is down from 115580 to 61,611 ms (~45% improvement)Main optimization
Median finding: we used to convert a map to sequence. Then get it sorted. Then we pick the median, max, and min, and then we call
seq.sum
AppSQLPlanAnalyzer
andAppSparkMetricsAnalyzer
classes to improve performance and simplify the codebase by using thebreakOut
method for collection transformations and introducing new methods inStatisticsMetrics
. Additionally, it removes an unused object and refactors theAccumInfo
class.Impact on output
By doing a diff on the output folder, it was found that the
sql_plan_metrics_for_application.csv
is differemt. The new output sounds more correctIn output generated by the dev branch:
The above seems incorrect because the total is non-zero while min, max and median are zeros.
Vs the new output
Improvements to Collection Transformations:
core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSQLPlanAnalyzer.scala
: Replaced intermediate collection variables with direct transformations usingbreakOut
to improve performance. [1] [2] [3] [4] [5]core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala
: AppliedbreakOut
to streamline collection processing and reduce memory overhead. [1] [2] [3] [4] [5] [6] [7] [8] [9] [10]Introduction of New Methods in
StatisticsMetrics
:core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/StatisticsMetrics.scala
: AddedcreateFromArr
andcreateOptionalFromArr
methods to compute statistics directly from arrays, improving code reuse and readability.Refactoring and Cleanup:
core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumInfo.scala
: Refactored thecalculateAccStats
method to use the newcreateFromArr
method fromStatisticsMetrics
, simplifying the code.core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala
: Removed the unusedAppSparkMetricsAnalyzer
object and itsgetStatistics
method.