Skip to content

Commit

Permalink
feat: support bool operator with other computation (GreptimeTeam#1844)
Browse files Browse the repository at this point in the history
* add some cases

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix sqlness test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* impl atan2 and power

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix instant manipulator

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
  • Loading branch information
waynexia authored and paomian committed Oct 19, 2023
1 parent c2dbc33 commit 6c4e1a2
Show file tree
Hide file tree
Showing 8 changed files with 315 additions and 74 deletions.
130 changes: 98 additions & 32 deletions src/promql/src/extension_plan/instant_manipulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ impl Stream for InstantManipulateStream {

impl InstantManipulateStream {
// refer to Go version: https://github.com/prometheus/prometheus/blob/e934d0f01158a1d55fa0ebb035346b195fcc1260/promql/engine.go#L1571
// and the function `vectorSelectorSingle`
pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult<RecordBatch> {
let mut take_indices = Vec::with_capacity(input.num_rows());
// TODO(ruihang): maybe the input is not timestamp millisecond array
Expand Down Expand Up @@ -373,28 +374,36 @@ impl InstantManipulateStream {
if cursor == ts_column.len() {
cursor -= 1;
}
// then, search backward to lookback
loop {
let curr = ts_column.value(cursor);
if let Some(field_column) = &field_column && field_column.value(cursor).is_nan() {
// if the newest value is NaN, it means the value is stale, so we should not use it
take_indices.push(None);
break;
}
if curr + self.lookback_delta < expected_ts {
// not found in lookback, leave this field blank.
take_indices.push(None);
break;
} else if curr < expected_ts && curr + self.lookback_delta >= expected_ts {
// find the expected value, push and break
take_indices.push(Some(cursor as u64));
break;
} else if cursor == 0 {
// reach the first value and not found in lookback, leave this field blank

// then examine the value
let curr_ts = ts_column.value(cursor);
if curr_ts + self.lookback_delta < expected_ts {
take_indices.push(None);
continue;
}
if curr_ts > expected_ts {
// exceeds current expected timestamp, examine the previous value
if let Some(prev_cursor) = cursor.checked_sub(1) {
let prev_ts = ts_column.value(prev_cursor);
if prev_ts + self.lookback_delta < expected_ts {
// not found in lookback, leave this field blank.
take_indices.push(None);
} else if let Some(field_column) = &field_column && field_column.value(prev_cursor).is_nan() {
// if the newest value is NaN, it means the value is stale, so we should not use it
take_indices.push(None);
} else {
// use this point
take_indices.push(Some(prev_cursor as u64));
}
} else {
take_indices.push(None);
break;
}
cursor -= 1;
} else if let Some(field_column) = &field_column && field_column.value(cursor).is_nan() {
// if the newest value is NaN, it means the value is stale, so we should not use it
take_indices.push(None);
} else {
// use this point
take_indices.push(Some(cursor as u64));
}
}

Expand Down Expand Up @@ -478,15 +487,20 @@ mod test {
lookback_delta: Millisecond,
interval: Millisecond,
expected: String,
contains_nan: bool,
) {
let memory_exec = Arc::new(prepare_test_data());
let memory_exec = if contains_nan {
Arc::new(prepare_test_data_with_nan())
} else {
Arc::new(prepare_test_data())
};
let normalize_exec = Arc::new(InstantManipulateExec {
start,
end,
lookback_delta,
interval,
time_index_column: TIME_INDEX_COLUMN.to_string(),
field_column: None,
field_column: Some("value".to_string()),
input: memory_exec,
metric: ExecutionPlanMetricsSet::new(),
});
Expand Down Expand Up @@ -517,7 +531,7 @@ mod test {
\n| 1970-01-01T00:05:00 | 1.0 | foo |\
\n+---------------------+-------+------+",
);
do_normalize_test(0, 310_000, 10_000, 30_000, expected).await;
do_normalize_test(0, 310_000, 10_000, 30_000, expected, false).await;
}

#[tokio::test]
Expand All @@ -544,7 +558,7 @@ mod test {
\n| 1970-01-01T00:05:00 | 1.0 | foo |\
\n+---------------------+-------+------+",
);
do_normalize_test(0, 300_000, 10_000, 10_000, expected).await;
do_normalize_test(0, 300_000, 10_000, 10_000, expected, false).await;
}

#[tokio::test]
Expand All @@ -566,7 +580,7 @@ mod test {
\n| 1970-01-01T00:05:00 | 1.0 | foo |\
\n+---------------------+-------+------+",
);
do_normalize_test(0, 300_000, 30_000, 30_000, expected).await;
do_normalize_test(0, 300_000, 30_000, 30_000, expected, false).await;
}

#[tokio::test]
Expand Down Expand Up @@ -604,7 +618,7 @@ mod test {
\n| 1970-01-01T00:05:00 | 1.0 | foo |\
\n+---------------------+-------+------+",
);
do_normalize_test(0, 300_000, 30_000, 10_000, expected).await;
do_normalize_test(0, 300_000, 30_000, 10_000, expected, false).await;
}

#[tokio::test]
Expand Down Expand Up @@ -646,7 +660,7 @@ mod test {
\n| 1970-01-01T00:05:00 | 1.0 | foo |\
\n+---------------------+-------+------+",
);
do_normalize_test(0, 300_000, 60_000, 10_000, expected).await;
do_normalize_test(0, 300_000, 60_000, 10_000, expected, false).await;
}

#[tokio::test]
Expand All @@ -668,7 +682,7 @@ mod test {
\n| 1970-01-01T00:05:00 | 1.0 | foo |\
\n+---------------------+-------+------+",
);
do_normalize_test(0, 300_000, 60_000, 30_000, expected).await;
do_normalize_test(0, 300_000, 60_000, 30_000, expected, false).await;
}

#[tokio::test]
Expand All @@ -681,7 +695,7 @@ mod test {
\n| 1970-01-01T00:04:01 | 1.0 | foo |\
\n+---------------------+-------+------+",
);
do_normalize_test(230_000, 245_000, 0, 1_000, expected).await;
do_normalize_test(230_000, 245_000, 0, 1_000, expected, false).await;
}

#[tokio::test]
Expand All @@ -695,7 +709,7 @@ mod test {
\n| 1970-01-01T00:00:30 | 1.0 | foo |\
\n+---------------------+-------+------+",
);
do_normalize_test(0, 30_000, 10_000, 10_000, expected).await;
do_normalize_test(0, 30_000, 10_000, 10_000, expected, false).await;
}

#[tokio::test]
Expand All @@ -712,7 +726,7 @@ mod test {
\n| 1970-01-01T00:05:00 | 1.0 | foo |\
\n+---------------------+-------+------+",
);
do_normalize_test(-900_000, 900_000, 30_000, 60_000, expected).await;
do_normalize_test(-900_000, 900_000, 30_000, 60_000, expected, false).await;
}

#[tokio::test]
Expand All @@ -733,6 +747,58 @@ mod test {
\n| 1970-01-01T00:05:00 | 1.0 | foo |\
\n+---------------------+-------+------+",
);
do_normalize_test(190_000, 300_000, 30_000, 10_000, expected).await;
do_normalize_test(190_000, 300_000, 30_000, 10_000, expected, false).await;
}

fn prepare_test_data_with_nan() -> MemoryExec {
let schema = Arc::new(Schema::new(vec![
Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
Field::new("value", DataType::Float64, true),
]));
let timestamp_column = Arc::new(TimestampMillisecondArray::from_slice([
0, 30_000, 60_000, 90_000, 120_000, // every 30s
])) as _;
let field_column = Arc::new(Float64Array::from_slice([
0.0,
f64::NAN,
6.0,
f64::NAN,
12.0,
])) as _;
let data =
RecordBatch::try_new(schema.clone(), vec![timestamp_column, field_column]).unwrap();

MemoryExec::try_new(&[vec![data]], schema, None).unwrap()
}

#[tokio::test]
async fn lookback_10s_interval_10s_with_nan() {
let expected = String::from(
"+---------------------+-------+\
\n| timestamp | value |\
\n+---------------------+-------+\
\n| 1970-01-01T00:00:00 | 0.0 |\
\n| 1970-01-01T00:00:10 | 0.0 |\
\n| 1970-01-01T00:01:00 | 6.0 |\
\n| 1970-01-01T00:01:10 | 6.0 |\
\n| 1970-01-01T00:02:00 | 12.0 |\
\n| 1970-01-01T00:02:10 | 12.0 |\
\n+---------------------+-------+",
);
do_normalize_test(0, 300_000, 10_000, 10_000, expected, true).await;
}

#[tokio::test]
async fn lookback_10s_interval_10s_with_nan_unaligned() {
let expected = String::from(
"+-------------------------+-------+\
\n| timestamp | value |\
\n+-------------------------+-------+\
\n| 1970-01-01T00:00:00.001 | 0.0 |\
\n| 1970-01-01T00:01:00.001 | 6.0 |\
\n| 1970-01-01T00:02:00.001 | 12.0 |\
\n+-------------------------+-------+",
);
do_normalize_test(1, 300_001, 10_000, 10_000, expected, true).await;
}
}
109 changes: 67 additions & 42 deletions src/promql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,8 @@ impl PromPlanner {
self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
self.ctx.table_name = Some(String::new());
let field_expr = DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(lhs),
op: Self::prom_token_to_binary_op(*op)?,
right: Box::new(rhs),
});
let field_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
let field_expr = field_expr_builder(lhs, rhs)?;

LogicalPlan::Extension(Extension {
node: Arc::new(
Expand All @@ -208,11 +205,10 @@ impl PromPlanner {
(Some(expr), None) => {
let input = self.prom_expr_to_plan(*rhs.clone()).await?;
let bin_expr_builder = |col: &String| {
let mut binary_expr = DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(expr.clone()),
op: Self::prom_token_to_binary_op(*op)?,
right: Box::new(DfExpr::Column(col.into())),
});
let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
let mut binary_expr =
binary_expr_builder(expr.clone(), DfExpr::Column(col.into()))?;

if is_comparison_op && should_return_bool {
binary_expr = DfExpr::Cast(Cast {
expr: Box::new(binary_expr),
Expand All @@ -231,11 +227,10 @@ impl PromPlanner {
(None, Some(expr)) => {
let input = self.prom_expr_to_plan(*lhs.clone()).await?;
let bin_expr_builder = |col: &String| {
let mut binary_expr = DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(DfExpr::Column(col.into())),
op: Self::prom_token_to_binary_op(*op)?,
right: Box::new(expr.clone()),
});
let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
let mut binary_expr =
binary_expr_builder(DfExpr::Column(col.into()), expr.clone())?;

if is_comparison_op && should_return_bool {
binary_expr = DfExpr::Cast(Cast {
expr: Box::new(binary_expr),
Expand Down Expand Up @@ -275,11 +270,11 @@ impl PromPlanner {
.context(DataFusionPlanningSnafu)?
.qualified_column();

let mut binary_expr = DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(DfExpr::Column(left_col)),
op: Self::prom_token_to_binary_op(*op)?,
right: Box::new(DfExpr::Column(right_col)),
});
let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
let mut binary_expr = binary_expr_builder(
DfExpr::Column(left_col),
DfExpr::Column(right_col),
)?;
if is_comparison_op && should_return_bool {
binary_expr = DfExpr::Cast(Cast {
expr: Box::new(binary_expr),
Expand Down Expand Up @@ -1103,35 +1098,65 @@ impl PromPlanner {
PromExpr::Paren(ParenExpr { expr }) => Self::try_build_literal_expr(expr),
// TODO(ruihang): support Unary operator
PromExpr::Unary(UnaryExpr { expr, .. }) => Self::try_build_literal_expr(expr),
PromExpr::Binary(PromBinaryExpr { lhs, rhs, op, .. }) => {
PromExpr::Binary(PromBinaryExpr {
lhs,
rhs,
op,
modifier,
}) => {
let lhs = Self::try_build_literal_expr(lhs)?;
let rhs = Self::try_build_literal_expr(rhs)?;
let op = Self::prom_token_to_binary_op(*op).ok()?;
Some(DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(lhs),
op,
right: Box::new(rhs),
}))
let is_comparison_op = Self::is_token_a_comparison_op(*op);
let expr_builder = Self::prom_token_to_binary_expr_builder(*op).ok()?;
let expr = expr_builder(lhs, rhs).ok()?;

let should_return_bool = if let Some(m) = modifier {
m.return_bool
} else {
false
};
if is_comparison_op && should_return_bool {
Some(DfExpr::Cast(Cast {
expr: Box::new(expr),
data_type: ArrowDataType::Float64,
}))
} else {
Some(expr)
}
}
}
}

fn prom_token_to_binary_op(token: TokenType) -> Result<Operator> {
/// Return a lambda to build binary expression from token.
/// Because some binary operator are function in DataFusion like `atan2` or `^`.
#[allow(clippy::type_complexity)]
fn prom_token_to_binary_expr_builder(
token: TokenType,
) -> Result<Box<dyn Fn(DfExpr, DfExpr) -> Result<DfExpr>>> {
match token.id() {
token::T_ADD => Ok(Operator::Plus),
token::T_SUB => Ok(Operator::Minus),
token::T_MUL => Ok(Operator::Multiply),
token::T_DIV => Ok(Operator::Divide),
token::T_MOD => Ok(Operator::Modulo),
token::T_EQLC => Ok(Operator::Eq),
token::T_NEQ => Ok(Operator::NotEq),
token::T_GTR => Ok(Operator::Gt),
token::T_LSS => Ok(Operator::Lt),
token::T_GTE => Ok(Operator::GtEq),
token::T_LTE => Ok(Operator::LtEq),
// TODO(ruihang): support these two operators
// token::T_POW => Ok(Operator::Power),
// token::T_ATAN2 => Ok(Operator::Atan2),
token::T_ADD => Ok(Box::new(|lhs, rhs| Ok(lhs + rhs))),
token::T_SUB => Ok(Box::new(|lhs, rhs| Ok(lhs - rhs))),
token::T_MUL => Ok(Box::new(|lhs, rhs| Ok(lhs * rhs))),
token::T_DIV => Ok(Box::new(|lhs, rhs| Ok(lhs / rhs))),
token::T_MOD => Ok(Box::new(|lhs: DfExpr, rhs| Ok(lhs % rhs))),
token::T_EQLC => Ok(Box::new(|lhs, rhs| Ok(lhs.eq(rhs)))),
token::T_NEQ => Ok(Box::new(|lhs, rhs| Ok(lhs.not_eq(rhs)))),
token::T_GTR => Ok(Box::new(|lhs, rhs| Ok(lhs.gt(rhs)))),
token::T_LSS => Ok(Box::new(|lhs, rhs| Ok(lhs.lt(rhs)))),
token::T_GTE => Ok(Box::new(|lhs, rhs| Ok(lhs.gt_eq(rhs)))),
token::T_LTE => Ok(Box::new(|lhs, rhs| Ok(lhs.lt_eq(rhs)))),
token::T_POW => Ok(Box::new(|lhs, rhs| {
Ok(DfExpr::ScalarFunction(ScalarFunction {
fun: BuiltinScalarFunction::Power,
args: vec![lhs, rhs],
}))
})),
token::T_ATAN2 => Ok(Box::new(|lhs, rhs| {
Ok(DfExpr::ScalarFunction(ScalarFunction {
fun: BuiltinScalarFunction::Atan2,
args: vec![lhs, rhs],
}))
})),
_ => UnexpectedTokenSnafu { token }.fail(),
}
}
Expand Down
Loading

0 comments on commit 6c4e1a2

Please sign in to comment.