diff --git a/src/query/src/promql/planner.rs b/src/query/src/promql/planner.rs index 964fd50e7c06..e42449d0dcb3 100644 --- a/src/query/src/promql/planner.rs +++ b/src/query/src/promql/planner.rs @@ -90,6 +90,12 @@ const FIELD_COLUMN_MATCHER: &str = "__field__"; /// Special modifier for cross schema query const SCHEMA_COLUMN_MATCHER: &str = "__schema__"; +/// Threshold for scatter scan mode +const MAX_SCATTER_POINTS: i64 = 400; + +/// Interval 1 hour in millisecond +const INTERVAL_1H: i64 = 60 * 60 * 1000; + #[derive(Default, Debug, Clone)] struct PromPlannerContext { // query parameters @@ -726,21 +732,10 @@ impl PromPlanner { Some(Offset::Neg(duration)) => -(duration.as_millis() as Millisecond), None => 0, }; - let range_ms = self.ctx.range.unwrap_or_default(); let mut scan_filters = self.matchers_to_expr(label_matchers.clone())?; - scan_filters.push(self.create_time_index_column_expr()?.gt_eq(DfExpr::Literal( - ScalarValue::TimestampMillisecond( - Some(self.ctx.start - offset_duration - self.ctx.lookback_delta - range_ms), - None, - ), - ))); - scan_filters.push(self.create_time_index_column_expr()?.lt_eq(DfExpr::Literal( - ScalarValue::TimestampMillisecond( - Some(self.ctx.end - offset_duration + self.ctx.lookback_delta), - None, - ), - ))); - + if let Some(time_index_filter) = self.build_time_index_filter(offset_duration)? { + scan_filters.push(time_index_filter); + } // make table scan with filter exprs let table_ref = self.table_ref()?; let mut table_scan = self @@ -967,6 +962,54 @@ impl PromPlanner { Ok(table_ref) } + fn build_time_index_filter(&self, offset_duration: i64) -> Result> { + let start = self.ctx.start; + let end = self.ctx.end; + let lookback_delta = self.ctx.lookback_delta; + let range = self.ctx.range.unwrap_or_default(); + let interval = self.ctx.interval; + let time_index_expr = self.create_time_index_column_expr()?; + let num_points = (end - start) / interval; + + // Scan a continuous time range + if (end - start) / interval > MAX_SCATTER_POINTS || interval <= INTERVAL_1H { + let single_time_range = time_index_expr + .clone() + .gt_eq(DfExpr::Literal(ScalarValue::TimestampMillisecond( + Some(self.ctx.start - offset_duration - self.ctx.lookback_delta - range), + None, + ))) + .and( + time_index_expr.lt_eq(DfExpr::Literal(ScalarValue::TimestampMillisecond( + Some(self.ctx.end - offset_duration + self.ctx.lookback_delta), + None, + ))), + ); + return Ok(Some(single_time_range)); + } + + // Otherwise scan scatter ranges separately + let mut filters = Vec::with_capacity(num_points as usize); + for timestamp in (start..end).step_by(interval as usize) { + filters.push( + time_index_expr + .clone() + .gt_eq(DfExpr::Literal(ScalarValue::TimestampMillisecond( + Some(timestamp - offset_duration - lookback_delta - range), + None, + ))) + .and(time_index_expr.clone().lt_eq(DfExpr::Literal( + ScalarValue::TimestampMillisecond( + Some(timestamp - offset_duration + lookback_delta), + None, + ), + ))), + ) + } + + Ok(filters.into_iter().reduce(DfExpr::or)) + } + /// Create a table scan plan and a filter plan with given filter. /// /// # Panic