diff --git a/src/query/src/promql/planner.rs b/src/query/src/promql/planner.rs index 5dabffedc4d4..dfa0431f5d78 100644 --- a/src/query/src/promql/planner.rs +++ b/src/query/src/promql/planner.rs @@ -169,244 +169,9 @@ impl PromPlanner { session_state: &SessionState, ) -> Result { let res = match &prom_expr { - PromExpr::Aggregate(AggregateExpr { - op, - expr, - // TODO(ruihang): support param - param: _param, - modifier, - }) => { - let input = self.prom_expr_to_plan(*expr.clone(), session_state).await?; - - // calculate columns to group by - // Need to append time index column into group by columns - let group_exprs = self.agg_modifier_to_col(input.schema(), modifier)?; - - // convert op and value columns to aggregate exprs - let aggr_exprs = self.create_aggregate_exprs(*op, &input)?; - - // create plan - let group_sort_expr = group_exprs - .clone() - .into_iter() - .map(|expr| expr.sort(true, false)); - LogicalPlanBuilder::from(input) - .aggregate(group_exprs, aggr_exprs) - .context(DataFusionPlanningSnafu)? - .sort(group_sort_expr) - .context(DataFusionPlanningSnafu)? - .build() - .context(DataFusionPlanningSnafu)? - } - PromExpr::Unary(UnaryExpr { expr }) => { - // Unary Expr in PromQL implys the `-` operator - let input = self.prom_expr_to_plan(*expr.clone(), session_state).await?; - self.projection_for_each_field_column(input, |col| { - Ok(DfExpr::Negative(Box::new(DfExpr::Column(col.into())))) - })? - } - PromExpr::Binary(PromBinaryExpr { - lhs, - rhs, - op, - modifier, - }) => { - // if set to true, comparison operator will return 0/1 (for true/false) instead of - // filter on the result column - let should_return_bool = if let Some(m) = modifier { - m.return_bool - } else { - false - }; - let is_comparison_op = Self::is_token_a_comparison_op(*op); - - // we should build a filter plan here if the op is comparison op and need not - // to return 0/1. Otherwise, we should build a projection plan - match ( - Self::try_build_literal_expr(lhs), - Self::try_build_literal_expr(rhs), - ) { - (Some(lhs), Some(rhs)) => { - self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string()); - self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()]; - self.ctx.reset_table_name_and_schema(); - let field_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?; - let mut field_expr = field_expr_builder(lhs, rhs)?; - - if is_comparison_op && should_return_bool { - field_expr = DfExpr::Cast(Cast { - expr: Box::new(field_expr), - data_type: ArrowDataType::Float64, - }); - } - - LogicalPlan::Extension(Extension { - node: Arc::new( - EmptyMetric::new( - self.ctx.start, - self.ctx.end, - self.ctx.interval, - SPECIAL_TIME_FUNCTION.to_string(), - DEFAULT_FIELD_COLUMN.to_string(), - Some(field_expr), - ) - .context(DataFusionPlanningSnafu)?, - ), - }) - } - // lhs is a literal, rhs is a column - (Some(mut expr), None) => { - let input = self.prom_expr_to_plan(*rhs.clone(), session_state).await?; - // check if the literal is a special time expr - if let Some(time_expr) = Self::try_build_special_time_expr( - lhs, - self.ctx.time_index_column.as_ref().unwrap(), - ) { - expr = time_expr - } - let bin_expr_builder = |col: &String| { - let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?; - let mut binary_expr = - binary_expr_builder(expr.clone(), DfExpr::Column(col.into()))?; - - if is_comparison_op && should_return_bool { - binary_expr = DfExpr::Cast(Cast { - expr: Box::new(binary_expr), - data_type: ArrowDataType::Float64, - }); - } - Ok(binary_expr) - }; - if is_comparison_op && !should_return_bool { - self.filter_on_field_column(input, bin_expr_builder)? - } else { - self.projection_for_each_field_column(input, bin_expr_builder)? - } - } - // lhs is a column, rhs is a literal - (None, Some(mut expr)) => { - let input = self.prom_expr_to_plan(*lhs.clone(), session_state).await?; - // check if the literal is a special time expr - if let Some(time_expr) = Self::try_build_special_time_expr( - rhs, - self.ctx.time_index_column.as_ref().unwrap(), - ) { - expr = time_expr - } - let bin_expr_builder = |col: &String| { - let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?; - let mut binary_expr = - binary_expr_builder(DfExpr::Column(col.into()), expr.clone())?; - - if is_comparison_op && should_return_bool { - binary_expr = DfExpr::Cast(Cast { - expr: Box::new(binary_expr), - data_type: ArrowDataType::Float64, - }); - } - Ok(binary_expr) - }; - if is_comparison_op && !should_return_bool { - self.filter_on_field_column(input, bin_expr_builder)? - } else { - self.projection_for_each_field_column(input, bin_expr_builder)? - } - } - // both are columns. join them on time index - (None, None) => { - let left_input = - self.prom_expr_to_plan(*lhs.clone(), session_state).await?; - let left_field_columns = self.ctx.field_columns.clone(); - let mut left_table_ref = self - .table_ref() - .unwrap_or_else(|_| TableReference::bare("")); - let left_context = self.ctx.clone(); - - let right_input = - self.prom_expr_to_plan(*rhs.clone(), session_state).await?; - let right_field_columns = self.ctx.field_columns.clone(); - let mut right_table_ref = self - .table_ref() - .unwrap_or_else(|_| TableReference::bare("")); - let right_context = self.ctx.clone(); - - // TODO(ruihang): avoid join if left and right are the same table - - // set op has "special" join semantics - if Self::is_token_a_set_op(*op) { - return self.set_op_on_non_field_columns( - left_input, - right_input, - left_context, - right_context, - *op, - modifier, - ); - } - - // normal join - if left_table_ref == right_table_ref { - // rename table references to avoid ambiguity - left_table_ref = TableReference::bare("lhs"); - right_table_ref = TableReference::bare("rhs"); - // `self.ctx` have ctx in right plan, if right plan have no tag, - // we use left plan ctx as the ctx for subsequent calculations, - // to avoid case like `host + scalar(...)` - // we need preserve tag column on `host` table in subsequent projection, - // which only show in left plan ctx. - if self.ctx.tag_columns.is_empty() { - self.ctx = left_context.clone(); - self.ctx.table_name = Some("lhs".to_string()); - } else { - self.ctx.table_name = Some("rhs".to_string()); - } - } - let mut field_columns = - left_field_columns.iter().zip(right_field_columns.iter()); - let join_plan = self.join_on_non_field_columns( - left_input, - right_input, - left_table_ref.clone(), - right_table_ref.clone(), - // if left plan or right plan tag is empty, means case like `scalar(...) + host` or `host + scalar(...)` - // under this case we only join on time index - left_context.tag_columns.is_empty() - || right_context.tag_columns.is_empty(), - )?; - let join_plan_schema = join_plan.schema().clone(); - - let bin_expr_builder = |_: &String| { - let (left_col_name, right_col_name) = field_columns.next().unwrap(); - let left_col = join_plan_schema - .qualified_field_with_name(Some(&left_table_ref), left_col_name) - .context(DataFusionPlanningSnafu)? - .into(); - let right_col = join_plan_schema - .qualified_field_with_name(Some(&right_table_ref), right_col_name) - .context(DataFusionPlanningSnafu)? - .into(); - - let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?; - let mut binary_expr = binary_expr_builder( - DfExpr::Column(left_col), - DfExpr::Column(right_col), - )?; - if is_comparison_op && should_return_bool { - binary_expr = DfExpr::Cast(Cast { - expr: Box::new(binary_expr), - data_type: ArrowDataType::Float64, - }); - } - Ok(binary_expr) - }; - if is_comparison_op && !should_return_bool { - self.filter_on_field_column(join_plan, bin_expr_builder)? - } else { - self.projection_for_each_field_column(join_plan, bin_expr_builder)? - } - } - } - } + PromExpr::Aggregate(expr) => self.prom_aggr_expr_to_plan(session_state, expr).await?, + PromExpr::Unary(expr) => self.prom_unary_expr_to_plan(session_state, expr).await?, + PromExpr::Binary(expr) => self.prom_binary_expr_to_plan(session_state, expr).await?, PromExpr::Paren(ParenExpr { expr }) => { self.prom_expr_to_plan(*expr.clone(), session_state).await? } @@ -414,13 +179,111 @@ impl PromPlanner { name: "Prom Subquery", } .fail()?, - PromExpr::NumberLiteral(NumberLiteral { val }) => { + PromExpr::NumberLiteral(lit) => self.prom_number_lit_to_plan(lit)?, + PromExpr::StringLiteral(lit) => self.prom_string_lit_to_plan(lit)?, + PromExpr::VectorSelector(selector) => { + self.prom_vector_selector_to_plan(selector).await? + } + PromExpr::MatrixSelector(selector) => { + self.prom_matrix_selector_to_plan(selector).await? + } + PromExpr::Call(expr) => self.prom_call_expr_to_plan(session_state, expr).await?, + PromExpr::Extension(expr) => self.prom_ext_expr_to_plan(session_state, expr).await?, + }; + Ok(res) + } + + async fn prom_aggr_expr_to_plan( + &mut self, + session_state: &SessionState, + aggr_expr: &AggregateExpr, + ) -> Result { + let AggregateExpr { + op, + expr, + // TODO(ruihang): support param + param: _param, + modifier, + } = aggr_expr; + + let input = self.prom_expr_to_plan(*expr.clone(), session_state).await?; + + // calculate columns to group by + // Need to append time index column into group by columns + let group_exprs = self.agg_modifier_to_col(input.schema(), modifier)?; + + // convert op and value columns to aggregate exprs + let aggr_exprs = self.create_aggregate_exprs(*op, &input)?; + + // create plan + let group_sort_expr = group_exprs + .clone() + .into_iter() + .map(|expr| expr.sort(true, false)); + LogicalPlanBuilder::from(input) + .aggregate(group_exprs, aggr_exprs) + .context(DataFusionPlanningSnafu)? + .sort(group_sort_expr) + .context(DataFusionPlanningSnafu)? + .build() + .context(DataFusionPlanningSnafu) + } + + async fn prom_unary_expr_to_plan( + &mut self, + session_state: &SessionState, + unary_expr: &UnaryExpr, + ) -> Result { + let UnaryExpr { expr } = unary_expr; + // Unary Expr in PromQL implys the `-` operator + let input = self.prom_expr_to_plan(*expr.clone(), session_state).await?; + self.projection_for_each_field_column(input, |col| { + Ok(DfExpr::Negative(Box::new(DfExpr::Column(col.into())))) + }) + } + + async fn prom_binary_expr_to_plan( + &mut self, + session_state: &SessionState, + binary_expr: &PromBinaryExpr, + ) -> Result { + let PromBinaryExpr { + lhs, + rhs, + op, + modifier, + } = binary_expr; + + // if set to true, comparison operator will return 0/1 (for true/false) instead of + // filter on the result column + let should_return_bool = if let Some(m) = modifier { + m.return_bool + } else { + false + }; + let is_comparison_op = Self::is_token_a_comparison_op(*op); + + // we should build a filter plan here if the op is comparison op and need not + // to return 0/1. Otherwise, we should build a projection plan + match ( + Self::try_build_literal_expr(lhs), + Self::try_build_literal_expr(rhs), + ) { + (Some(lhs), Some(rhs)) => { self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string()); self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()]; self.ctx.reset_table_name_and_schema(); - let literal_expr = df_prelude::lit(*val); + let field_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?; + let mut field_expr = field_expr_builder(lhs, rhs)?; - LogicalPlan::Extension(Extension { + if is_comparison_op && should_return_bool { + field_expr = DfExpr::Cast(Cast { + expr: Box::new(field_expr), + data_type: ArrowDataType::Float64, + }); + } + + Ok(LogicalPlan::Extension(Extension { node: Arc::new( EmptyMetric::new( self.ctx.start, @@ -428,178 +291,370 @@ impl PromPlanner { self.ctx.interval, SPECIAL_TIME_FUNCTION.to_string(), DEFAULT_FIELD_COLUMN.to_string(), - Some(literal_expr), + Some(field_expr), ) .context(DataFusionPlanningSnafu)?, ), - }) + })) } - PromExpr::StringLiteral(StringLiteral { val }) => { - self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string()); - self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()]; - self.ctx.reset_table_name_and_schema(); - let literal_expr = df_prelude::lit(val.to_string()); + // lhs is a literal, rhs is a column + (Some(mut expr), None) => { + let input = self.prom_expr_to_plan(*rhs.clone(), session_state).await?; + // check if the literal is a special time expr + if let Some(time_expr) = Self::try_build_special_time_expr( + lhs, + self.ctx.time_index_column.as_ref().unwrap(), + ) { + expr = time_expr + } + let bin_expr_builder = |col: &String| { + let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?; + let mut binary_expr = + binary_expr_builder(expr.clone(), DfExpr::Column(col.into()))?; + + if is_comparison_op && should_return_bool { + binary_expr = DfExpr::Cast(Cast { + expr: Box::new(binary_expr), + data_type: ArrowDataType::Float64, + }); + } + Ok(binary_expr) + }; + if is_comparison_op && !should_return_bool { + self.filter_on_field_column(input, bin_expr_builder) + } else { + self.projection_for_each_field_column(input, bin_expr_builder) + } + } + // lhs is a column, rhs is a literal + (None, Some(mut expr)) => { + let input = self.prom_expr_to_plan(*lhs.clone(), session_state).await?; + // check if the literal is a special time expr + if let Some(time_expr) = Self::try_build_special_time_expr( + rhs, + self.ctx.time_index_column.as_ref().unwrap(), + ) { + expr = time_expr + } + let bin_expr_builder = |col: &String| { + let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?; + let mut binary_expr = + binary_expr_builder(DfExpr::Column(col.into()), expr.clone())?; + + if is_comparison_op && should_return_bool { + binary_expr = DfExpr::Cast(Cast { + expr: Box::new(binary_expr), + data_type: ArrowDataType::Float64, + }); + } + Ok(binary_expr) + }; + if is_comparison_op && !should_return_bool { + self.filter_on_field_column(input, bin_expr_builder) + } else { + self.projection_for_each_field_column(input, bin_expr_builder) + } + } + // both are columns. join them on time index + (None, None) => { + let left_input = self.prom_expr_to_plan(*lhs.clone(), session_state).await?; + let left_field_columns = self.ctx.field_columns.clone(); + let mut left_table_ref = self + .table_ref() + .unwrap_or_else(|_| TableReference::bare("")); + let left_context = self.ctx.clone(); + + let right_input = self.prom_expr_to_plan(*rhs.clone(), session_state).await?; + let right_field_columns = self.ctx.field_columns.clone(); + let mut right_table_ref = self + .table_ref() + .unwrap_or_else(|_| TableReference::bare("")); + let right_context = self.ctx.clone(); + + // TODO(ruihang): avoid join if left and right are the same table + + // set op has "special" join semantics + if Self::is_token_a_set_op(*op) { + return self.set_op_on_non_field_columns( + left_input, + right_input, + left_context, + right_context, + *op, + modifier, + ); + } - LogicalPlan::Extension(Extension { - node: Arc::new( - EmptyMetric::new( - self.ctx.start, - self.ctx.end, - self.ctx.interval, - SPECIAL_TIME_FUNCTION.to_string(), - DEFAULT_FIELD_COLUMN.to_string(), - Some(literal_expr), - ) - .context(DataFusionPlanningSnafu)?, - ), - }) + // normal join + if left_table_ref == right_table_ref { + // rename table references to avoid ambiguity + left_table_ref = TableReference::bare("lhs"); + right_table_ref = TableReference::bare("rhs"); + // `self.ctx` have ctx in right plan, if right plan have no tag, + // we use left plan ctx as the ctx for subsequent calculations, + // to avoid case like `host + scalar(...)` + // we need preserve tag column on `host` table in subsequent projection, + // which only show in left plan ctx. + if self.ctx.tag_columns.is_empty() { + self.ctx = left_context.clone(); + self.ctx.table_name = Some("lhs".to_string()); + } else { + self.ctx.table_name = Some("rhs".to_string()); + } + } + let mut field_columns = left_field_columns.iter().zip(right_field_columns.iter()); + let join_plan = self.join_on_non_field_columns( + left_input, + right_input, + left_table_ref.clone(), + right_table_ref.clone(), + // if left plan or right plan tag is empty, means case like `scalar(...) + host` or `host + scalar(...)` + // under this case we only join on time index + left_context.tag_columns.is_empty() || right_context.tag_columns.is_empty(), + )?; + let join_plan_schema = join_plan.schema().clone(); + + let bin_expr_builder = |_: &String| { + let (left_col_name, right_col_name) = field_columns.next().unwrap(); + let left_col = join_plan_schema + .qualified_field_with_name(Some(&left_table_ref), left_col_name) + .context(DataFusionPlanningSnafu)? + .into(); + let right_col = join_plan_schema + .qualified_field_with_name(Some(&right_table_ref), right_col_name) + .context(DataFusionPlanningSnafu)? + .into(); + + let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?; + let mut binary_expr = + binary_expr_builder(DfExpr::Column(left_col), DfExpr::Column(right_col))?; + if is_comparison_op && should_return_bool { + binary_expr = DfExpr::Cast(Cast { + expr: Box::new(binary_expr), + data_type: ArrowDataType::Float64, + }); + } + Ok(binary_expr) + }; + if is_comparison_op && !should_return_bool { + self.filter_on_field_column(join_plan, bin_expr_builder) + } else { + self.projection_for_each_field_column(join_plan, bin_expr_builder) + } } - PromExpr::VectorSelector(VectorSelector { - name, - offset, - matchers, - at: _, - }) => { - let matchers = self.preprocess_label_matchers(matchers, name)?; - self.setup_context().await?; - let normalize = self - .selector_to_series_normalize_plan(offset, matchers, false) - .await?; - let manipulate = InstantManipulate::new( + } + } + + fn prom_number_lit_to_plan(&mut self, number_literal: &NumberLiteral) -> Result { + let NumberLiteral { val } = number_literal; + self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string()); + self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()]; + self.ctx.reset_table_name_and_schema(); + let literal_expr = df_prelude::lit(*val); + + let plan = LogicalPlan::Extension(Extension { + node: Arc::new( + EmptyMetric::new( self.ctx.start, self.ctx.end, - self.ctx.lookback_delta, self.ctx.interval, - self.ctx - .time_index_column - .clone() - .expect("time index should be set in `setup_context`"), - self.ctx.field_columns.first().cloned(), - normalize, - ); - LogicalPlan::Extension(Extension { - node: Arc::new(manipulate), - }) - } - PromExpr::MatrixSelector(MatrixSelector { vs, range }) => { - let VectorSelector { - name, - offset, - matchers, - .. - } = vs; - let matchers = self.preprocess_label_matchers(matchers, name)?; - self.setup_context().await?; - - ensure!(!range.is_zero(), ZeroRangeSelectorSnafu); - let range_ms = range.as_millis() as _; - self.ctx.range = Some(range_ms); - - let normalize = self - .selector_to_series_normalize_plan(offset, matchers, true) - .await?; - let manipulate = RangeManipulate::new( + SPECIAL_TIME_FUNCTION.to_string(), + DEFAULT_FIELD_COLUMN.to_string(), + Some(literal_expr), + ) + .context(DataFusionPlanningSnafu)?, + ), + }); + Ok(plan) + } + + fn prom_string_lit_to_plan(&mut self, string_literal: &StringLiteral) -> Result { + let StringLiteral { val } = string_literal; + self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string()); + self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()]; + self.ctx.reset_table_name_and_schema(); + let literal_expr = df_prelude::lit(val.to_string()); + + let plan = LogicalPlan::Extension(Extension { + node: Arc::new( + EmptyMetric::new( self.ctx.start, self.ctx.end, self.ctx.interval, - // TODO(ruihang): convert via Timestamp datatypes to support different time units - range_ms, - self.ctx - .time_index_column - .clone() - .expect("time index should be set in `setup_context`"), - self.ctx.field_columns.clone(), - normalize, + SPECIAL_TIME_FUNCTION.to_string(), + DEFAULT_FIELD_COLUMN.to_string(), + Some(literal_expr), ) - .context(DataFusionPlanningSnafu)?; + .context(DataFusionPlanningSnafu)?, + ), + }); + Ok(plan) + } - LogicalPlan::Extension(Extension { - node: Arc::new(manipulate), - }) - } - PromExpr::Call(Call { func, args }) => { - // some special functions that are not expression but a plan - match func.name { - SPECIAL_HISTOGRAM_QUANTILE => { - return self.create_histogram_plan(args, session_state).await - } - SPECIAL_VECTOR_FUNCTION => return self.create_vector_plan(args).await, - SCALAR_FUNCTION => return self.create_scalar_plan(args, session_state).await, - _ => {} - } + async fn prom_vector_selector_to_plan( + &mut self, + vector_selector: &VectorSelector, + ) -> Result { + let VectorSelector { + name, + offset, + matchers, + at: _, + } = vector_selector; + let matchers = self.preprocess_label_matchers(matchers, name)?; + self.setup_context().await?; + let normalize = self + .selector_to_series_normalize_plan(offset, matchers, false) + .await?; + let manipulate = InstantManipulate::new( + self.ctx.start, + self.ctx.end, + self.ctx.lookback_delta, + self.ctx.interval, + self.ctx + .time_index_column + .clone() + .expect("time index should be set in `setup_context`"), + self.ctx.field_columns.first().cloned(), + normalize, + ); + Ok(LogicalPlan::Extension(Extension { + node: Arc::new(manipulate), + })) + } - // transform function arguments - let args = self.create_function_args(&args.args)?; - let input = if let Some(prom_expr) = args.input { - self.prom_expr_to_plan(prom_expr, session_state).await? - } else { - self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string()); - self.ctx.reset_table_name_and_schema(); - LogicalPlan::Extension(Extension { - node: Arc::new( - EmptyMetric::new( - self.ctx.start, - self.ctx.end, - self.ctx.interval, - SPECIAL_TIME_FUNCTION.to_string(), - DEFAULT_FIELD_COLUMN.to_string(), - None, - ) - .context(DataFusionPlanningSnafu)?, - ), - }) - }; - let mut func_exprs = - self.create_function_expr(func, args.literals, session_state)?; - func_exprs.insert(0, self.create_time_index_column_expr()?); - func_exprs.extend_from_slice(&self.create_tag_column_exprs()?); - - LogicalPlanBuilder::from(input) - .project(func_exprs) - .context(DataFusionPlanningSnafu)? - .filter(self.create_empty_values_filter_expr()?) - .context(DataFusionPlanningSnafu)? - .build() - .context(DataFusionPlanningSnafu)? - } - PromExpr::Extension(promql_parser::parser::ast::Extension { expr }) => { - let children = expr.children(); - let plan = self - .prom_expr_to_plan(children[0].clone(), session_state) - .await?; - // Wrapper for the explanation/analyze of the existing plan - // https://docs.rs/datafusion-expr/latest/datafusion_expr/logical_plan/builder/struct.LogicalPlanBuilder.html#method.explain - // if `analyze` is true, runs the actual plan and produces - // information about metrics during run. - // if `verbose` is true, prints out additional details when VERBOSE keyword is specified - match expr.name() { - "ANALYZE" => LogicalPlanBuilder::from(plan) - .explain(false, true) - .unwrap() - .build() - .context(DataFusionPlanningSnafu)?, - "ANALYZE VERBOSE" => LogicalPlanBuilder::from(plan) - .explain(true, true) - .unwrap() - .build() - .context(DataFusionPlanningSnafu)?, - "EXPLAIN" => LogicalPlanBuilder::from(plan) - .explain(false, false) - .unwrap() - .build() - .context(DataFusionPlanningSnafu)?, - "EXPLAIN VERBOSE" => LogicalPlanBuilder::from(plan) - .explain(true, false) - .unwrap() - .build() - .context(DataFusionPlanningSnafu)?, - _ => LogicalPlanBuilder::empty(true) - .build() - .context(DataFusionPlanningSnafu)?, - } + async fn prom_matrix_selector_to_plan( + &mut self, + matrix_selector: &MatrixSelector, + ) -> Result { + let MatrixSelector { vs, range } = matrix_selector; + let VectorSelector { + name, + offset, + matchers, + .. + } = vs; + let matchers = self.preprocess_label_matchers(matchers, name)?; + self.setup_context().await?; + + ensure!(!range.is_zero(), ZeroRangeSelectorSnafu); + let range_ms = range.as_millis() as _; + self.ctx.range = Some(range_ms); + + let normalize = self + .selector_to_series_normalize_plan(offset, matchers, true) + .await?; + let manipulate = RangeManipulate::new( + self.ctx.start, + self.ctx.end, + self.ctx.interval, + // TODO(ruihang): convert via Timestamp datatypes to support different time units + range_ms, + self.ctx + .time_index_column + .clone() + .expect("time index should be set in `setup_context`"), + self.ctx.field_columns.clone(), + normalize, + ) + .context(DataFusionPlanningSnafu)?; + + Ok(LogicalPlan::Extension(Extension { + node: Arc::new(manipulate), + })) + } + + async fn prom_call_expr_to_plan( + &mut self, + session_state: &SessionState, + call_expr: &Call, + ) -> Result { + let Call { func, args } = call_expr; + // some special functions that are not expression but a plan + match func.name { + SPECIAL_HISTOGRAM_QUANTILE => { + return self.create_histogram_plan(args, session_state).await } + SPECIAL_VECTOR_FUNCTION => return self.create_vector_plan(args).await, + SCALAR_FUNCTION => return self.create_scalar_plan(args, session_state).await, + _ => {} + } + + // transform function arguments + let args = self.create_function_args(&args.args)?; + let input = if let Some(prom_expr) = args.input { + self.prom_expr_to_plan(prom_expr, session_state).await? + } else { + self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string()); + self.ctx.reset_table_name_and_schema(); + LogicalPlan::Extension(Extension { + node: Arc::new( + EmptyMetric::new( + self.ctx.start, + self.ctx.end, + self.ctx.interval, + SPECIAL_TIME_FUNCTION.to_string(), + DEFAULT_FIELD_COLUMN.to_string(), + None, + ) + .context(DataFusionPlanningSnafu)?, + ), + }) }; - Ok(res) + let mut func_exprs = self.create_function_expr(func, args.literals, session_state)?; + func_exprs.insert(0, self.create_time_index_column_expr()?); + func_exprs.extend_from_slice(&self.create_tag_column_exprs()?); + + LogicalPlanBuilder::from(input) + .project(func_exprs) + .context(DataFusionPlanningSnafu)? + .filter(self.create_empty_values_filter_expr()?) + .context(DataFusionPlanningSnafu)? + .build() + .context(DataFusionPlanningSnafu) + } + + async fn prom_ext_expr_to_plan( + &mut self, + session_state: &SessionState, + ext_expr: &promql_parser::parser::ast::Extension, + ) -> Result { + // let promql_parser::parser::ast::Extension { expr } = ext_expr; + let expr = &ext_expr.expr; + let children = expr.children(); + let plan = self + .prom_expr_to_plan(children[0].clone(), session_state) + .await?; + // Wrapper for the explanation/analyze of the existing plan + // https://docs.rs/datafusion-expr/latest/datafusion_expr/logical_plan/builder/struct.LogicalPlanBuilder.html#method.explain + // if `analyze` is true, runs the actual plan and produces + // information about metrics during run. + // if `verbose` is true, prints out additional details when VERBOSE keyword is specified + match expr.name() { + "ANALYZE" => LogicalPlanBuilder::from(plan) + .explain(false, true) + .unwrap() + .build() + .context(DataFusionPlanningSnafu), + "ANALYZE VERBOSE" => LogicalPlanBuilder::from(plan) + .explain(true, true) + .unwrap() + .build() + .context(DataFusionPlanningSnafu), + "EXPLAIN" => LogicalPlanBuilder::from(plan) + .explain(false, false) + .unwrap() + .build() + .context(DataFusionPlanningSnafu), + "EXPLAIN VERBOSE" => LogicalPlanBuilder::from(plan) + .explain(true, false) + .unwrap() + .build() + .context(DataFusionPlanningSnafu), + _ => LogicalPlanBuilder::empty(true) + .build() + .context(DataFusionPlanningSnafu), + } } /// Extract metric name from `__name__` matcher and set it into [PromPlannerContext].