Skip to content

Commit

Permalink
HPCC-31650 Address incorrect analyzer cost calculations and cost thre…
Browse files Browse the repository at this point in the history
…shold

* The rate used to calculate the cost of issues has been updated
so that the unit is consistant and produces valid cost calculation
* 'minInterestingCost' is a decimal value to set the minimum dollar
cost value for reported issues.
* minInterestingCost' is compared with the calculated cost of the
issue (rather than the timePenalty)

Signed-off-by: Shamser Ahmed <shamser.ahmed@lexisnexis.com>
  • Loading branch information
shamser committed Dec 20, 2024
1 parent 803672b commit 6ddcdc3
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 74 deletions.
12 changes: 5 additions & 7 deletions common/wuanalysis/anacommon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ void PerformanceIssue::print() const
printf("\n");
}

void PerformanceIssue::createException(IWorkUnit * wu, double costRate)
void PerformanceIssue::createException(IWorkUnit * wu)
{
ErrorSeverity mappedSeverity = wu->getWarningSeverity(errorCode, (ErrorSeverity)SeverityWarning);
if (mappedSeverity == SeverityIgnore)
Expand All @@ -66,17 +66,15 @@ void PerformanceIssue::createException(IWorkUnit * wu, double costRate)
StringBuffer s(comment); // Append scope to comment as scope column is not visible in ECLWatch
s.appendf(" (%s)", scope.str());
we->setExceptionMessage(s.str());
if (costRate!=0.0)
{
double timePenaltyPerHour = (double)statUnits2seconds(timePenalty) / 3600;
we->setCost(timePenaltyPerHour*costRate);
}
if (costPenalty!=0)
we->setCost(cost_type2money(costPenalty));
we->setExceptionSource(CostOptimizerName);
}

void PerformanceIssue::set(AnalyzerErrorCode _errorCode, stat_type _timePenalty, const char * msg, ...)
void PerformanceIssue::set(AnalyzerErrorCode _errorCode, stat_type _timePenalty, cost_type _costPenalty, const char * msg, ...)
{
timePenalty = _timePenalty;
costPenalty = _costPenalty;
errorCode = _errorCode;
va_list args;
va_start(args, msg);
Expand Down
9 changes: 6 additions & 3 deletions common/wuanalysis/anacommon.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,13 @@ class PerformanceIssue : public CInterface
public:
int compareCost(const PerformanceIssue & other) const;
void print() const;
void createException(IWorkUnit * we, double costRate);
void createException(IWorkUnit * we);

void set(AnalyzerErrorCode _errorCode, stat_type _timePenalty, const char * msg, ...) __attribute__((format(printf, 4, 5)));
void set(AnalyzerErrorCode _errorCode, stat_type _timePenalty, cost_type _costPenalty, const char * msg, ...) __attribute__((format(printf, 5, 6)));
void setLocation(const char * definition);
void setScope(const char *_scope) { scope.set(_scope); }
stat_type getTimePenalityCost() const { return timePenalty; }
stat_type getTimePenalty() const { return timePenalty; }
cost_type getCostPenalty() const { return costPenalty; }

private:
AnalyzerErrorCode errorCode = ANA_GENERICERROR_ID;
Expand All @@ -76,6 +77,7 @@ class PerformanceIssue : public CInterface
unsigned column = 0;
StringAttr scope;
stat_type timePenalty = 0; // number of nanoseconds lost as a result.
cost_type costPenalty = 0;
StringBuffer comment;
};

Expand All @@ -87,6 +89,7 @@ enum WutOptionType
watOptSkewThreshold,
watOptMinRowsPerNode,
watPreFilteredKJThreshold,
watCostRatePerHour,
watOptMax
};

Expand Down
93 changes: 60 additions & 33 deletions common/wuanalysis/anarule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@
#include "anarule.hpp"
#include "commonext.hpp"


static cost_type calcIssueCost(stat_type timePenalty, const stat_type clusterCostPerHour)
{
double timePenaltyPerHour = (double)statUnits2seconds(timePenalty) / 3600;
return timePenaltyPerHour*clusterCostPerHour;
}

class ActivityKindRule : public AActivityRule
{
public:
Expand Down Expand Up @@ -52,21 +59,24 @@ class DistributeSkewRule : public ActivityKindRule
stat_type rowsMaxSkew = outputEdge->getStatRaw(StNumRowsProcessed, StSkewMax);
if (rowsMaxSkew > options.queryOption(watOptSkewThreshold))
{
// Use downstream activity time to calculate approximate cost
// Use downstream activity time to calculate approximate timePenalty
IWuActivity * targetActivity = outputEdge->queryTarget();
assertex(targetActivity);
stat_type timeMaxLocalExecute = targetActivity->getStatRaw(StTimeLocalExecute, StMaxX);
stat_type timeAvgLocalExecute = targetActivity->getStatRaw(StTimeLocalExecute, StAvgX);
// Consider ways to improve this cost calculation further
stat_type cost = timeMaxLocalExecute - timeAvgLocalExecute;

IWuEdge * inputEdge = activity.queryInput(0);
if (inputEdge && (inputEdge->getStatRaw(StNumRowsProcessed, StSkewMax) < rowsMaxSkew))
result.set(ANA_DISTRIB_SKEW_INPUT_ID, cost, "DISTRIBUTE output skew is worse than input skew");
else
result.set(ANA_DISTRIB_SKEW_OUTPUT_ID, cost, "Significant skew in DISTRIBUTE output");
updateInformation(result, activity);
return true;
// Consider ways to improve this timePenalty calculation further
stat_type timePenalty = timeMaxLocalExecute - timeAvgLocalExecute;
cost_type costPenalty = calcIssueCost(timePenalty, options.queryOption(watCostRatePerHour));
if (costPenalty >= options.queryOption(watOptMinInterestingCost))
{
IWuEdge * inputEdge = activity.queryInput(0);
if (inputEdge && (inputEdge->getStatRaw(StNumRowsProcessed, StSkewMax) < rowsMaxSkew))
result.set(ANA_DISTRIB_SKEW_INPUT_ID, timePenalty, costPenalty, "DISTRIBUTE output skew is worse than input skew");
else
result.set(ANA_DISTRIB_SKEW_OUTPUT_ID, timePenalty, costPenalty, "Significant skew in DISTRIBUTE output");
updateInformation(result, activity);
return true;
}
}
return false;
}
Expand Down Expand Up @@ -135,12 +145,13 @@ class IoSkewRule : public AActivityRule
stat_type timeMaxLocalExecute = activity.getStatRaw(StTimeLocalExecute, StMaxX);
stat_type timeAvgLocalExecute = activity.getStatRaw(StTimeLocalExecute, StAvgX);

stat_type cost;
stat_type timePenalty;
const char * msg = nullptr;
if ((actkind==TAKspillread||actkind==TAKspillwrite) && (activity.getStatRaw(stat, StMinX) == 0))
{
//If one node didn't spill then it is possible the skew caused all the lost time
cost = timeMaxLocalExecute;
result.set(ANA_IOSKEW_RECORDS_ID, cost, "Uneven worker spilling is causing uneven %s time", category);
timePenalty = timeMaxLocalExecute;
msg = "Uneven worker spilling";
}
else
{
Expand All @@ -161,19 +172,25 @@ class IoSkewRule : public AActivityRule
}
if (wuEdge && wuEdge->getStatRaw(StNumRowsProcessed, StSkewMax)>options.queryOption(watOptSkewThreshold))
numRowsSkew = true;
cost = (timeMaxLocalExecute - timeAvgLocalExecute);
timePenalty = (timeMaxLocalExecute - timeAvgLocalExecute);
if (sizeSkew)
{
if (numRowsSkew)
result.set(ANA_IOSKEW_RECORDS_ID, cost, "Significant skew in number of records is causing uneven %s time", category);
msg = "Significant skew in number of records";
else
result.set(ANA_IOSKEW_RECORDS_ID, cost, "Significant skew in record sizes is causing uneven %s time", category);
msg = "Significant skew in record sizes";
}
else
result.set(ANA_IOSKEW_RECORDS_ID, cost, "Significant skew in IO performance is causing uneven %s time", category);
msg = "Significant skew in IO performance";
}
assertex(msg);
cost_type costPenalty = calcIssueCost(timePenalty, options.queryOption(watCostRatePerHour));
if (costPenalty >= options.queryOption(watOptMinInterestingCost))
{
result.set(ANA_IOSKEW_RECORDS_ID, timePenalty, costPenalty, "%s is causing uneven %s time", msg, category);
updateInformation(result, activity);
return true;
}
updateInformation(result, activity);
return true;
}
return false;
}
Expand Down Expand Up @@ -206,7 +223,7 @@ class LocalExecuteSkewRule : public AActivityRule

stat_type timeMaxLocalExecute = activity.getStatRaw(StTimeLocalExecute, StMaxX);
stat_type timeAvgLocalExecute = activity.getStatRaw(StTimeLocalExecute, StAvgX);
stat_type timePenalty = (timeMaxLocalExecute - timeAvgLocalExecute);;
stat_type timePenalty = (timeMaxLocalExecute - timeAvgLocalExecute);
if (timePenalty<options.queryOption(watOptMinInterestingTime))
return false;

Expand All @@ -224,13 +241,19 @@ class LocalExecuteSkewRule : public AActivityRule
if (wuOutputEdge && (wuOutputEdge->getStatRaw(StNumRowsProcessed, StSkewMax)>options.queryOption(watOptSkewThreshold)))
outputSkewed = true;

if (inputSkewed)
result.set(ANA_EXECUTE_SKEW_ID, timePenalty, "Significant skew in local execute time caused by uneven input");
else if (outputSkewed)
result.set(ANA_EXECUTE_SKEW_ID, timePenalty, "Significant skew in local execute time caused by uneven output");
else
result.set(ANA_EXECUTE_SKEW_ID, timePenalty, "Significant skew in local execute time");
return true;
cost_type costPenalty = calcIssueCost(timePenalty, options.queryOption(watCostRatePerHour));
if (costPenalty >= options.queryOption(watOptMinInterestingCost))
{
if (inputSkewed)
result.set(ANA_EXECUTE_SKEW_ID, timePenalty, costPenalty, "Significant skew in local execute time caused by uneven input");
else if (outputSkewed)
result.set(ANA_EXECUTE_SKEW_ID, timePenalty, costPenalty, "Significant skew in local execute time caused by uneven output");
else
result.set(ANA_EXECUTE_SKEW_ID, timePenalty, costPenalty, "Significant skew in local execute time");
updateInformation(result, activity);
return true;
}
return false;
}
};

Expand All @@ -252,12 +275,16 @@ class KeyedJoinExcessRejectedRowsRule : public ActivityKindRule
if (preFilteredPer > options.queryOption(watPreFilteredKJThreshold))
{
IWuActivity * inputActivity = inputEdge->querySource();
// Use input activity as the basis of cost because the rows generated from input activity is being filtered out
// Use input activity as the basis of timePenalty because the rows generated from input activity is being filtered out
stat_type timeAvgLocalExecute = inputActivity->getStatRaw(StTimeLocalExecute, StAvgX);
stat_type cost = statPercentageOf(timeAvgLocalExecute, preFilteredPer);
result.set(ANA_KJ_EXCESS_PREFILTER_ID, cost, "Large number of rows from left dataset rejected in keyed join");
updateInformation(result, activity);
return true;
stat_type timePenalty = statPercentageOf(timeAvgLocalExecute, preFilteredPer);
cost_type costPenalty = calcIssueCost(timePenalty, options.queryOption(watCostRatePerHour));
if (costPenalty >= options.queryOption(watOptMinInterestingCost))
{
result.set(ANA_KJ_EXCESS_PREFILTER_ID, timePenalty, costPenalty, "Large number of rows from left dataset rejected in keyed join");
updateInformation(result, activity);
return true;
}
}
}
}
Expand Down
60 changes: 35 additions & 25 deletions common/wuanalysis/anawu.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ enum WutOptValueType
wutOptValueTypePercent,
wutOptValueTypeCount,
wutOptValueTypeBool,
wutOptValueTypeCost,
wutOptValueTypeMax,
};

Expand All @@ -71,10 +72,12 @@ struct WuOption

constexpr struct WuOption wuOptionsDefaults[watOptMax]
= { {watOptMinInterestingTime, "minInterestingTime", 1000, wutOptValueTypeMSec},
{watOptMinInterestingCost, "minInterestingCost", 30000, wutOptValueTypeMSec},
{watOptMinInterestingCost, "minInterestingCost", money2cost_type(5.0) /* $5 */, wutOptValueTypeCost},
{watOptSkewThreshold, "skewThreshold", 20, wutOptValueTypePercent},
{watOptMinRowsPerNode, "minRowsPerNode", 1000, wutOptValueTypeCount},
{watPreFilteredKJThreshold, "preFilteredKJThreshold", 50, wutOptValueTypePercent},
/* Note watCostRatePerHour cannot be used as debug option or config option (this is calculated) */
{watCostRatePerHour, "costRatePerHour", 0, wutOptValueTypeCost},
};

constexpr bool checkWuOptionsDefaults(int i = watOptMax)
Expand Down Expand Up @@ -107,9 +110,8 @@ class WuAnalyserOptions : public IAnalyserOptions
case wutOptValueTypePercent:
wuOptions[opt] = statPercent((stat_type)val);
break;
case wutOptValueTypeCost:
case wutOptValueTypeCount:
wuOptions[opt] = (stat_type) val;
break;
case wutOptValueTypeBool:
wuOptions[opt] = (stat_type) val;
break;
Expand All @@ -125,9 +127,13 @@ class WuAnalyserOptions : public IAnalyserOptions
{
StringBuffer wuOptionName("@");
wuOptionName.append(wuOptionsDefaults[opt].name);
__int64 val = options->getPropInt64(wuOptionName, -1);
if (val!=-1)
setOptionValue(static_cast<WutOptionType>(opt), val);
__int64 val = 0;
if (opt==watOptMinInterestingCost)
val = money2cost_type(options->getPropReal(wuOptionName, cost_type2money(-1.0)));
else
val = options->getPropInt64(wuOptionName, -1);
if (val>0)
setOptionValue(static_cast<WutOptionType>(opt), money2cost_type(val));
}
}

Expand All @@ -137,9 +143,13 @@ class WuAnalyserOptions : public IAnalyserOptions
{
StringBuffer wuOptionName("analyzer_");
wuOptionName.append(wuOptionsDefaults[opt].name);
__int64 val = wu->getDebugValueInt64(wuOptionName, -1);
if (val!=-1)
setOptionValue(static_cast<WutOptionType>(opt), val);
__int64 val = 0;
if (opt==watOptMinInterestingCost)
val = money2cost_type(wu->getDebugValueReal(wuOptionName, cost_type2money(-1.0)));
else
val = wu->getDebugValueInt64(wuOptionName, -1);
if (val>0)
setOptionValue(static_cast<WutOptionType>(opt), money2cost_type(val));
}
}
stat_type queryOption(WutOptionType opt) const override { return wuOptions[opt]; }
Expand Down Expand Up @@ -175,12 +185,12 @@ class WorkunitRuleAnalyser : public WorkunitAnalyserBase
public:
WorkunitRuleAnalyser();

void applyConfig(IPropertyTree *cfg, IConstWorkUnit * wu);
void applyConfig(IPropertyTree *cfg, IConstWorkUnit * wu, double _costRate);

void applyRules();
void check(const char * scope, IWuActivity & activity);
void print();
void update(IWorkUnit *wu, double costRate);
void update(IWorkUnit *wu);

protected:
CIArrayOf<AActivityRule> rules;
Expand Down Expand Up @@ -1353,10 +1363,13 @@ WorkunitRuleAnalyser::WorkunitRuleAnalyser()
gatherRules(rules);
}

void WorkunitRuleAnalyser::applyConfig(IPropertyTree *cfg, IConstWorkUnit * wu)
void WorkunitRuleAnalyser::applyConfig(IPropertyTree *cfg, IConstWorkUnit * wu, double costRate)
{
options.applyConfig(cfg);
options.applyConfig(wu);
/* watCostRatePerHour is calculated by caller and its value is set in options*/
/* (So, watCostRatePerHour cannot be used as debug option or config option)*/
options.setOptionValue(watCostRatePerHour, money2cost_type(costRate));
}


Expand All @@ -1372,11 +1385,8 @@ void WorkunitRuleAnalyser::check(const char * scope, IWuActivity & activity)
Owned<PerformanceIssue> issue (new PerformanceIssue);
if (rules.item(i).check(*issue, activity, options))
{
if (issue->getTimePenalityCost() >= options.queryOption(watOptMinInterestingCost))
{
if (!highestCostIssue || highestCostIssue->getTimePenalityCost() < issue->getTimePenalityCost())
highestCostIssue.setown(issue.getClear());
}
if (!highestCostIssue || highestCostIssue->getTimePenalty() < issue->getTimePenalty())
highestCostIssue.setown(issue.getClear());
}
}
}
Expand All @@ -1401,10 +1411,10 @@ void WorkunitRuleAnalyser::print()
issues.item(i).print();
}

void WorkunitRuleAnalyser::update(IWorkUnit *wu, double costRate)
void WorkunitRuleAnalyser::update(IWorkUnit *wu)
{
ForEachItemIn(i, issues)
issues.item(i).createException(wu, costRate);
issues.item(i).createException(wu);
}


Expand Down Expand Up @@ -2087,27 +2097,27 @@ void WorkunitStatsAnalyser::traceDependencies()

//---------------------------------------------------------------------------------------------------------------------

void WUANALYSIS_API analyseWorkunit(IWorkUnit * wu, const char *optGraph, IPropertyTree *options, double costPerMs)
void WUANALYSIS_API analyseWorkunit(IWorkUnit * wu, const char *optGraph, IPropertyTree *options, double costPerHour)
{
WorkunitRuleAnalyser analyser;
analyser.applyConfig(options, wu);
analyser.applyConfig(options, wu, costPerHour);
analyser.analyse(wu, optGraph);
analyser.applyRules();
analyser.update(wu, costPerMs);
analyser.update(wu);
}

void WUANALYSIS_API analyseAndPrintIssues(IConstWorkUnit * wu, double costRate, bool updatewu)
void WUANALYSIS_API analyseAndPrintIssues(IConstWorkUnit * wu, double costPerHour, bool updatewu)
{
WorkunitRuleAnalyser analyser;
analyser.applyConfig(nullptr, wu);
analyser.applyConfig(nullptr, wu, costPerHour);
analyser.analyse(wu, nullptr);
analyser.applyRules();
analyser.print();
if (updatewu)
{
Owned<IWorkUnit> lockedwu = &(wu->lock());
lockedwu->clearExceptions(CostOptimizerName);
analyser.update(lockedwu, costRate);
analyser.update(lockedwu);
}
}

Expand Down
4 changes: 2 additions & 2 deletions common/wuanalysis/anawu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@

#include "anacommon.hpp"

void WUANALYSIS_API analyseWorkunit(IWorkUnit * wu, const char *optGraph, IPropertyTree *options, double costPerMs);
void WUANALYSIS_API analyseAndPrintIssues(IConstWorkUnit * wu, double costPerMs, bool updatewu);
void WUANALYSIS_API analyseWorkunit(IWorkUnit * wu, const char *optGraph, IPropertyTree *options, double costPerHour);
void WUANALYSIS_API analyseAndPrintIssues(IConstWorkUnit * wu, double costPerHour, bool updatewu);

//---------------------------------------------------------------------------------------------------------------------

Expand Down
Loading

0 comments on commit 6ddcdc3

Please sign in to comment.