-
Notifications
You must be signed in to change notification settings - Fork 92
Druid Query Cost Model
When generating a Spark Physical Plan containing Druid Queries we have the option of issuing the Druid Queries to the Broker or we can issue segment level queries to Historicals and aggregate the segment results in Spark. Further if we issue queries against Druid Historicals we have the option of processing one or more index segments per query.
- For a Broker executed query the PhysicalScan operator in the Spark Plan has 1 partition, the entire Druid Query result flows through the 1 task for this operator.
- For a Historical executed query the PhysicalScan operator in the Spark plan has P partitions, where P is calculated based on many factors like the parallelism of Spark, parallelism of Druid, the number of segments in the index cluster. Each Task injects the results from a set of Druid segments. The SparkPlan contains an Aggregate Operator on top of the PhysicalScan to merge the results of all the Druid Queries. So in this kind of Plan the overall cost must factor in the merging(spark shuffle) cost along with the Druid execution cost.
None of the options is optimal for all plans:
- a query against the Broker can be expensive if a lot of output rows
have to be merged in the broker. For e.g. if we execute
select large_cardinality_dim, count(*) from table
as a broker query, large amounts of rows will be shipped from historicals to broker, and the broker may be overwhelmed with the processing required to merge a large number of values. - on the other hand for a query with very few output rows like
select count(*) from table
which only outputs one row, the cost of performing a shuffle in Spark may dominate the druid queries phase of the Plan.
In order to make a Plan choice we have come up with a Cost Model
which attempts to factor in the System Landscape, the Column
Cardinalities and Query Characteristics. The cost model is on by
default, but can be turned off; the system setting controlling this
is spark.sparklinedata.druid.querycostmodel.enabled
. We continue
to support the ‘old style’ plan choice where the user specifies the
queryHistoricalServers
and numSegmentsPerHistoricalQuery
druid
datasource options(see [Druid DataSource
Options](https://github.com/SparklineData/spark-druid-olap/wiki/Druid-Datasource-Options)),
though we strongly recommend using the Cost Model leaving the
spark.sparklinedata.druid.querycostmodel.enabled
flag to true.
Cost Factors
Name(1) | Description | Default Value |
---|---|---|
enabled | flag that controls if decision to execute druid query on broker or | true |
historicals is based on the cost model; default is true. If false | ||
the decision is based on the ‘queryHistoricalServers’ and | ||
‘numSegmentsPerHistoricalQuery’ datasource options. | ||
histMergeCostFactor | cost of performing a segment agg. merge in druid | 0.07 |
historicals relative to spark shuffle cost | ||
histSegsPerQueryLimit | the max. number of segments processed per historical query. | 5 |
queryintervalScalingForDistinctValues | The ndv estimate for a query interval uses this number. The ratio of | 3 |
the (query interval/index interval) is multiplied by this number. | ||
The ndv for a query is estimated as: | ||
‘log(this_value * min(10*interval_ratio,10)) * orig_ndv’. The reduction is logarithmic | ||
and this value applies further dampening factor on the reduction. At a default value | ||
of ‘3’ any interval ratio >= 0.33 will have no reduction in ndvs. | ||
historicalProcessingCost | the cost per row of groupBy processing in historical servers | 0.1 |
relative to spark shuffle cost | ||
historicalTimeSeriesProcessingCost | the cost per row of timeseries processing in historical servers | 0.1 |
relative to spark shuffle cost | ||
historicalTimeSeriesProcessingCost | the cost per row of timeseries processing in historical servers | 0.07 |
relative to spark shuffle cost | ||
sparkSchedulingCost | the cost of scheduling tasks in spark relative to the shuffle cost of 1 byte | 1.0 |
sparkAggregatingCost | the cost per row to do aggregation in spark relative to the shuffle cost | 0.15 |
druidOutputTransportCost | the cost per row to transport druid output relative to the shuffle cost | 0.4 |
(1) All param names begin with spark.sparklinedata.druid.querycostmodel.
We assume that the cost to shuffle 1 row in spark is 1. All other costs are relative(factors) of this cost. For example, by default we assume the cost to process a row in druid historicals is 0.25 of shuffle cost.
Estimate Query Output Size
We first estimate the cardinality without consider any time interval reduction.
Output Cardinality = product of groupby.dimension(num. distinct values) * groupby.dimension(selectivity)
Where selectivity:
- is only applied for equality and in predicates.
- a predicate on a non-gby dimension is assumed not changed the cardinality estimate.
- or and not predicates are also assumed not to reduce cardinality.
- any date function extraction are also ignored, and so don’t reduce cardinality.
- any predicates on non groupby dimension also don’t reduce output cardinality.
For the Time Dimension, we don’t know the cardinality, so we assume the cardinality to be Int.MaxValue(~ 2 billion)
For example:
- query 1
-
select d1, count(*) from table where d2 = <value>
will have an estimate ofd1.numDistinctValues
. The predicate on d2 doesn’t reduce o/p size. - query 2
-
select d1, count(*) from table where d2 = <value>
will have an estimate of 1. - query 3
-
select d1, count(*) from table where d2 in (<v1>, <v2>)
will have an estimate of 2. - query 4
-
select d1, count(*) from table where d2 in (<v1>, <v2>) or d3 = <value>
will have an estimate ofd1.numDistinctValues
. The or predicate doesn’t reduce o/p size..
Finally we apply a reduction factor for the query time interval base on
spark.sparklinedata.druid.querycostmodel.queryintervalScalingForDistinctValues
The final calculation is:
val intervalRatio : Double = Math.min(queryIntervalMillis/indexIntervalMillis, 1.0)
val scaledRatio = Math.max(queryIntervalRatioScaleFactor * intervalRatio * 10.0, 10.0)
Math.round(ndvForIndexEstimate * Math.log10(scaledRatio))
queryIntervalMillis, indexIntervalMillis
are the milliseconds in
the query interval and the total index interval respectively.
Landscape Factors
Name | Description | Calculation |
---|---|---|
sparkCoresPerExecutor | Number of Spark Executors | based on “spark.executor.cores” setting or |
the sparkContext.schedulerBackend.defaultParallelism | ||
numProcessingThreadsPerHistorical | The number of historical processing threads | this is based on this setting on the Druid DataSource; |
if it is not specified it is assumed to be the same as | ||
sparkCoresPerExecutor. | ||
numHistoricals | The number of historical servers | read from the Zookeeper ensemble |
Cost Calculations
- Common Calculations
histProcessingCostPerRow is based on
historicalTimeSeriesProcessingCostPerRowFactor or
historicalGByProcessigCostPerRowFactor, depending the type of
DruidQuery.
queryOutputSizeEstimate is calculated based on the ratio of
queryIntervalMillis and indexIntervalMillis
segmentOutputSizeEstimate is calculated based on the ratio of
segIntervalMillis and indexIntervalMillis
numHistoricalThreads = numHistoricals * numProcessingThreadsPerHistorical
parallelismPerWave = Math.min(numHistoricalThreads, numSparkCores)
def estimateNumWaves(numSegsPerQuery : Long) : Long =
Math.round(
(numSegmentsProcessed/numSegsPerQuery)/ parallelismPerWave + 0.5
)
- Broker Cost
numWaves = 1
brokertMergeCost = (numSegmentsProcessed - 1) * segmentOutputSizeEstimate * brokerMergeCostPerRow
segmentOutputTransportCost = queryOutputSizeEstimate * (druidOutputTransportCostPerRowFactor * shuffleCostPerRow)
queryCost = numWaves * processingCostPerHist + segmentOutputTransportCost + brokertMergeCost
- Historical Cost for a particular setting of numSegmentsPerQuery
numWaves = estimateNumWaves(numSegsPerQuery)
estimateOutputSizePerHist = estimate size based on query size of 'segIntervalMillis * numSegsPerQuery'
processingCostPerHist = numSegsPerQuery * segmentOutputSizeEstimate * histProcessingCostPerRow
histMergeCost = (numSegsPerQuery - 1) * segmentOutputSizeEstimate * histMergeCostPerRow
segmentOutputTransportCost = estimateOutputSizePerHist * (druidOutputTransportCostPerRowFactor * shuffleCostPerRow)
shuffleCost = numWaves * segmentOutputSizeEstimate * shuffleCostPerRow
sparkSchedulingCost = numWaves * Math.min(parallelismPerWave, numSegmentsProcessed) * sparkSchedulingCostPerTask
sparkAggCost = numWaves * segmentOutputSizeEstimate * (sparkAggregationCostPerRowFactor * shuffleCostPerRow)
costPerHistoricalWave = processingCostPerHist + histMergeCost + segmentOutputTransportCost
druidStageCost = numWaves * costPerHistoricalWave
queryCost = druidStageCost + shuffleCost + sparkSchedulingCost + sparkAggCost
- Cost Algorithm
The spark.sparklinedata.druid.querycostmodel.histSegsPerQueryLimit
context parameter controls the limit of the ‘numSegmentsPerQuery’
parameter(default is 3). We estimate the broker cost and the
historical plan with different values for ‘numSegmentsPerQuery’ upto
the limit, and pick the plan with the lowest cost. For certain
queries the historical plan is not an option, so these queries are
issued against the Broker.
Users can issue an ‘explain druid rewrite’ on any sql query to get the Druid Queries and the cost estimates for the different physical plan choices for each Druid Query. For example:
explain druid rewrite
SELECT COUNT(DISTINCT CAST(`orderLineItemPartSupplier`.`l_shipdate` AS TIMESTAMP))
AS `ctd_date_string_ok`
FROM `orderLineItemPartSupplier`
HAVING (COUNT(1) > 0)
outputs:
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Project [ctd_date_string_ok#319L] |
|+- Filter havingCondition#373: boolean |
| +- TungstenAggregate(key=[], functions=[(first(if ((gid#374 = 0)) count(1)#377L else null) ignore nulls,mode=Final,isDistinct=false),(count(if ((gid#374 = 1)) cast(l_shipdate as timestamp)#375 else null),mode=Final,isDistinct=false)], output=[havingCondition#373,ctd_date_string_ok#319L])|
| +- TungstenAggregate(key=[], functions=[(first(if ((gid#374 = 0)) count(1)#377L else null) ignore nulls,mode=Partial,isDistinct=false),(count(if ((gid#374 = 1)) cast(l_shipdate as timestamp)#375 else null),mode=Partial,isDistinct=false)], output=[first#385L,valueSet#386,count#387L]) |
| +- Union |
| :- Project [null AS cast(l_shipdate as timestamp)#375,0 AS gid#374,alias-1#381L AS count(1)#377L] |
| : +- Scan DruidQuery(774346563): { |
| "q" : { |
| "jsonClass" : "TimeSeriesQuerySpec", |
| "queryType" : "timeseries", |
| "dataSource" : "tpch", |
| "intervals" : [ "1993-01-01T00:00:00.000Z/1997-12-31T00:00:01.000Z" ], |
| "granularity" : "all", |
| "aggregations" : [ { |
| "jsonClass" : "FunctionAggregationSpec", |
| "type" : "longSum", |
| "name" : "alias-1", |
| "fieldName" : "count" |
| } ] |
| }, |
| "queryHistoricalServer" : false, |
| "numSegmentsPerQuery" : -1, |
| "intervalSplits" : [ { |
| "start" : 725846400000, |
| "end" : 883526401000 |
| } ], |
| "outputAttrSpec" : [ { |
| "exprId" : { |
| "id" : 381, |
| "jvmId" : { } |
| }, |
| "name" : "alias-1", |
| "dataType" : { }, |
| "tf" : "toLong" |
| } ] |
|}[alias-1#381L] |
| +- Project [cast(alias-3#375 as timestamp) AS alias-3#375,1 AS gid#374,null AS count(1)#377L] |
| +- SortBasedAggregate(key=[alias-3#375], functions=[], output=[alias-3#375]) |
| +- Scan DruidQuery(307964119): { |
| "q" : { |
| "jsonClass" : "GroupByQuerySpec", |
| "queryType" : "groupBy", |
| "dataSource" : "tpch", |
| "dimensions" : [ { |
| "jsonClass" : "ExtractionDimensionSpec", |
| "type" : "extraction", |
| "dimension" : "__time", |
| "outputName" : "alias-3", |
| "extractionFn" : { |
| "jsonClass" : "TimeFormatExtractionFunctionSpec", |
| "type" : "timeFormat", |
| "format" : "YYYY-MM-dd HH:mm:ss", |
| "timeZone" : "UTC", |
| "locale" : "en_US" |
| } |
| } ], |
| "granularity" : "all", |
| "aggregations" : [ { |
| "jsonClass" : "FunctionAggregationSpec", |
| "type" : "count", |
| "name" : "addCountAggForNoMetricQuery", |
| "fieldName" : "count" |
| } ], |
| "intervals" : [ "1993-01-01T00:00:00.000Z/1997-12-31T00:00:01.000Z" ] |
| }, |
| "queryHistoricalServer" : true, |
| "numSegmentsPerQuery" : 2, |
| "intervalSplits" : [ { |
| "start" : 725846400000, |
| "end" : 883526401000 |
| } ], |
| "outputAttrSpec" : [ { |
| "exprId" : { |
| "id" : 375, |
| "jvmId" : { } |
| }, |
| "name" : "alias-3", |
| "dataType" : { }, |
| "tf" : "toString" |
| } ] |
|}[alias-3#375] |
| |
|DruidQuery(550340200) details :: |
| |
|queryHistorical = false, |
|numSegmentsPerQuery = -1, |
|bestCost = 12.200000000000001, |
|Druid Query Cost Model:: |
| |
|dimsNDVEstimate = 1 |
|shuffleCostPerRow = 1.0, |
|histMergeCostPerRowFactor = 0.2, |
|histSegsPerQueryLimit = 3, |
|queryIntervalRatioScaleFactor = 3.0, |
|historicalTimeSeriesProcessingCostPerRowFactor = 0.1, |
|historicalGByProcessigCostPerRowFactor = 0.25, |
|sparkSchedulingCostPerTask = 1.0, |
|sparkAggregationCostPerRowFactor = 0.15, |
|druidOutputTransportCostPerRowFactor = 0.4, |
|indexIntervalMillis = 157680001000, |
|queryIntervalMillis = 157680001000, |
|segIntervalMillis = 2678400000, |
|sparkCoresPerExecutor = 32, |
|numSparkExecutors = 1, |
|numProcessingThreadsPerHistorical = 32, |
|numHistoricals = 1, |
|querySpecClass = class org.sparklinedata.druid.TimeSeriesQuerySpec |
| histProcessingCost = 0.1 |
|queryOutputSizeEstimate = 1 |
|segmentOutputSizeEstimate = 1 |
|numSegmentsProcessed = 59 |
|numSparkCores = 32 |
|numHistoricalThreads = 32 |
|parallelismPerWave = 32 |
| |
|minCost : |
|numWaves = 2, |
|processingCostPerHist = 0.1, |
|brokerMergeCost = 11.600000000000001, |
|segmentOutputTransportCost = 0.4 |
|queryCost = 12.200000000000001 |
| |
| |
| Cost Details: |
|broker/historical | queryCost | numWaves | numSegmentsPerQuery | druidMergeCostPerWave | transportCostPerWave | druidCostPerWave | totalDruidCost | sparkShuffleCost | sparkAggCost | sparkSchedulingCost |
| broker | 12.2E0 | 1 | all | 11.6E0 | 40000000E-8 | 12.2E0 | 12.2E0 | 0.0 | 0.0 | 0.0 |
| historical | 67.3E0 | 2 | 1 | 0E0 | 40000000E-8 | 50000000E-8 | 1E0 | 2E0 | 30000000E-8 | 64E0 |
| historical | 33.95E0 | 1 | 2 | 20000000E-8 | 40000000E-8 | 80000000E-8 | 80000000E-8 | 1E0 | 15000000E-8 | 32E0 |
| historical | 34.25E0 | 1 | 3 | 40000000E-8 | 40000000E-8 | 1.1E0 | 1.1E0 | 1E0 | 15000000E-8 | 32E0 |
| |
| |
| |
|DruidQuery(53983513) details :: |
| |
|queryHistorical = true, |
|numSegmentsPerQuery = 2, |
|bestCost = 4.831838237750001E9, |
|Druid Query Cost Model:: |
| |
|dimsNDVEstimate = 2147483647 |
|shuffleCostPerRow = 1.0, |
|histMergeCostPerRowFactor = 0.2, |
|histSegsPerQueryLimit = 3, |
|queryIntervalRatioScaleFactor = 3.0, |
|historicalTimeSeriesProcessingCostPerRowFactor = 0.1, |
|historicalGByProcessigCostPerRowFactor = 0.25, |
|sparkSchedulingCostPerTask = 1.0, |
|sparkAggregationCostPerRowFactor = 0.15, |
|druidOutputTransportCostPerRowFactor = 0.4, |
|indexIntervalMillis = 157680001000, |
|queryIntervalMillis = 157680001000, |
|segIntervalMillis = 2678400000, |
|sparkCoresPerExecutor = 32, |
|numSparkExecutors = 1, |
|numProcessingThreadsPerHistorical = 32, |
|numHistoricals = 1, |
|querySpecClass = class org.sparklinedata.druid.GroupByQuerySpec |
| histProcessingCost = 0.25 |
|queryOutputSizeEstimate = 3172093739 |
|segmentOutputSizeEstimate = 2147483647 |
|numSegmentsProcessed = 59 |
|numSparkCores = 32 |
|numHistoricalThreads = 32 |
|parallelismPerWave = 32 |
| |
|minCost : |
|numWaves = 1, |
|numSegmentsPerQuery = 2, |
|estimateOutputSizePerHist = 2147483647, |
|processingCostPerHist = 1.0737418235E9, |
|histMergeCost = 4.2949672940000004E8, |
|segmentOutputTransportCost = 8.589934588000001E8, |
|shuffleCost = 2.147483647E9, |
|sparkSchedulingCost = 32.0, |
|sparkAggCost = 3.2212254705E8, |
|costPerHistoricalWave = 2.3622320117000003E9, |
|druidStageCost = 2.3622320117000003E9, |
|queryCost = 4.831838237750001E9 |
| |
| |
| Cost Details: |
|broker/historical | queryCost | numWaves | numSegmentsPerQuery | druidMergeCostPerWave | transportCostPerWave | druidCostPerWave | totalDruidCost | sparkShuffleCost | sparkAggCost | sparkSchedulingCost |
| broker | 272.5338962E8 | 1 | all | 249.1081031E8 | 12.68837496E8 | 272.5338962E8 | 272.5338962E8 | 0.0 | 0.0 | 0.0 |
| historical | 77.30941193E8 | 2 | 1 | 0E0 | 8.589934588E8 | 13.95864371E8 | 27.91728741E8 | 42.94967294E8 | 6.442450941E8 | 64E0 |
| historical | 48.31838238E8 | 1 | 2 | 4.294967294E8 | 8.589934588E8 | 23.62232012E8 | 23.62232012E8 | 21.47483647E8 | 3.22122547E8 | 32E0 |
| historical | 57.98205879E8 | 1 | 3 | 8.589934588E8 | 8.589934588E8 | 33.28599653E8 | 33.28599653E8 | 21.47483647E8 | 3.22122547E8 | 32E0 |
| |
| |
| |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
- Overview
- Quick Start
-
User Guide
- [Defining a DataSource on a Flattened Dataset](https://github.com/SparklineData/spark-druid-olap/wiki/Defining-a Druid-DataSource-on-a-Flattened-Dataset)
- Defining a Star Schema
- Sample Queries
- Approximate Count and Spatial Queries
- Druid Datasource Options
- Sparkline SQLContext Options
- Using Tableau with Sparkline
- How to debug a Query Plan?
- Running the ThriftServer with Sparklinedata components
- [Setting up multiple Sparkline ThriftServers - Load Balancing & HA] (https://github.com/SparklineData/spark-druid-olap/wiki/Setting-up-multiple-Sparkline-ThriftServers-(Load-Balancing-&-HA))
- Runtime Views
- Sparkline SQL extensions
- Sparkline Pluggable Modules
- Dev. Guide
- Reference Architectures
- Releases
- Cluster Spinup Tool
- TPCH Benchmark