Skip to content

Commit

Permalink
perf: acclerate scatter query (GreptimeTeam#4607)
Browse files Browse the repository at this point in the history
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
  • Loading branch information
waynexia authored and CookiePieWw committed Sep 17, 2024
1 parent 40b2462 commit 2ac6d25
Showing 1 changed file with 57 additions and 14 deletions.
71 changes: 57 additions & 14 deletions src/query/src/promql/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ const FIELD_COLUMN_MATCHER: &str = "__field__";
/// Special modifier for cross schema query
const SCHEMA_COLUMN_MATCHER: &str = "__schema__";

/// Threshold for scatter scan mode
const MAX_SCATTER_POINTS: i64 = 400;

/// Interval 1 hour in millisecond
const INTERVAL_1H: i64 = 60 * 60 * 1000;

#[derive(Default, Debug, Clone)]
struct PromPlannerContext {
// query parameters
Expand Down Expand Up @@ -726,21 +732,10 @@ impl PromPlanner {
Some(Offset::Neg(duration)) => -(duration.as_millis() as Millisecond),
None => 0,
};
let range_ms = self.ctx.range.unwrap_or_default();
let mut scan_filters = self.matchers_to_expr(label_matchers.clone())?;
scan_filters.push(self.create_time_index_column_expr()?.gt_eq(DfExpr::Literal(
ScalarValue::TimestampMillisecond(
Some(self.ctx.start - offset_duration - self.ctx.lookback_delta - range_ms),
None,
),
)));
scan_filters.push(self.create_time_index_column_expr()?.lt_eq(DfExpr::Literal(
ScalarValue::TimestampMillisecond(
Some(self.ctx.end - offset_duration + self.ctx.lookback_delta),
None,
),
)));

if let Some(time_index_filter) = self.build_time_index_filter(offset_duration)? {
scan_filters.push(time_index_filter);
}
// make table scan with filter exprs
let table_ref = self.table_ref()?;
let mut table_scan = self
Expand Down Expand Up @@ -967,6 +962,54 @@ impl PromPlanner {
Ok(table_ref)
}

fn build_time_index_filter(&self, offset_duration: i64) -> Result<Option<DfExpr>> {
let start = self.ctx.start;
let end = self.ctx.end;
let lookback_delta = self.ctx.lookback_delta;
let range = self.ctx.range.unwrap_or_default();
let interval = self.ctx.interval;
let time_index_expr = self.create_time_index_column_expr()?;
let num_points = (end - start) / interval;

// Scan a continuous time range
if (end - start) / interval > MAX_SCATTER_POINTS || interval <= INTERVAL_1H {
let single_time_range = time_index_expr
.clone()
.gt_eq(DfExpr::Literal(ScalarValue::TimestampMillisecond(
Some(self.ctx.start - offset_duration - self.ctx.lookback_delta - range),
None,
)))
.and(
time_index_expr.lt_eq(DfExpr::Literal(ScalarValue::TimestampMillisecond(
Some(self.ctx.end - offset_duration + self.ctx.lookback_delta),
None,
))),
);
return Ok(Some(single_time_range));
}

// Otherwise scan scatter ranges separately
let mut filters = Vec::with_capacity(num_points as usize);
for timestamp in (start..end).step_by(interval as usize) {
filters.push(
time_index_expr
.clone()
.gt_eq(DfExpr::Literal(ScalarValue::TimestampMillisecond(
Some(timestamp - offset_duration - lookback_delta - range),
None,
)))
.and(time_index_expr.clone().lt_eq(DfExpr::Literal(
ScalarValue::TimestampMillisecond(
Some(timestamp - offset_duration + lookback_delta),
None,
),
))),
)
}

Ok(filters.into_iter().reduce(DfExpr::or))
}

/// Create a table scan plan and a filter plan with given filter.
///
/// # Panic
Expand Down

0 comments on commit 2ac6d25

Please sign in to comment.