diff --git a/src/flow/src/compute/render/reduce.rs b/src/flow/src/compute/render/reduce.rs index d44c290d9474..5e52daaedd1c 100644 --- a/src/flow/src/compute/render/reduce.rs +++ b/src/flow/src/compute/render/reduce.rs @@ -776,170 +776,6 @@ mod test { use crate::expr::{self, AggregateFunc, BinaryFunc, GlobalId, MapFilterProject, UnaryFunc}; use crate::repr::{ColumnType, RelationType}; - /// SELECT sum(number) FROM numbers_with_ts GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00') - /// input table columns: number, ts - /// expected: sum(number), window_start, window_end - #[test] - fn test_tumble_group_by() { - let mut df = Hydroflow::new(); - let mut state = DataflowState::default(); - let mut ctx = harness_test_ctx(&mut df, &mut state); - const START: i64 = 1625097600000; - let rows = vec![ - (1u32, START + 1000), - (2u32, START + 1500), - (3u32, START + 2000), - (1u32, START + 2500), - (2u32, START + 3000), - (3u32, START + 3500), - ]; - let rows = rows - .into_iter() - .map(|(number, ts)| { - ( - Row::new(vec![number.into(), Timestamp::new_millisecond(ts).into()]), - 1, - 1, - ) - }) - .collect_vec(); - - let collection = ctx.render_constant(rows.clone()); - ctx.insert_global(GlobalId::User(1), collection); - - let aggr_expr = AggregateExpr { - func: AggregateFunc::SumUInt32, - expr: ScalarExpr::Column(0), - distinct: false, - }; - let expected = TypedPlan { - schema: RelationType::new(vec![ - ColumnType::new(CDT::uint64_datatype(), true), // sum(number) - ColumnType::new(CDT::datetime_datatype(), false), // window start - ColumnType::new(CDT::datetime_datatype(), false), // window end - ]) - .into_unnamed(), - // TODO(discord9): mfp indirectly ref to key columns - /* - .with_key(vec![1]) - .with_time_index(Some(0)),*/ - plan: Plan::Mfp { - input: Box::new( - Plan::Reduce { - input: Box::new( - Plan::Get { - id: crate::expr::Id::Global(GlobalId::User(1)), - } - .with_types( - RelationType::new(vec![ - ColumnType::new(ConcreteDataType::uint32_datatype(), false), - ColumnType::new(ConcreteDataType::datetime_datatype(), false), - ]) - .into_unnamed(), - ), - ), - key_val_plan: KeyValPlan { - key_plan: MapFilterProject::new(2) - .map(vec![ - ScalarExpr::Column(1).call_unary( - UnaryFunc::TumbleWindowFloor { - window_size: Interval::from_month_day_nano( - 0, - 0, - 1_000_000_000, - ), - start_time: Some(DateTime::new(1625097600000)), - }, - ), - ScalarExpr::Column(1).call_unary( - UnaryFunc::TumbleWindowCeiling { - window_size: Interval::from_month_day_nano( - 0, - 0, - 1_000_000_000, - ), - start_time: Some(DateTime::new(1625097600000)), - }, - ), - ]) - .unwrap() - .project(vec![2, 3]) - .unwrap() - .into_safe(), - val_plan: MapFilterProject::new(2) - .project(vec![0, 1]) - .unwrap() - .into_safe(), - }, - reduce_plan: ReducePlan::Accumulable(AccumulablePlan { - full_aggrs: vec![aggr_expr.clone()], - simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)], - distinct_aggrs: vec![], - }), - } - .with_types( - RelationType::new(vec![ - ColumnType::new(CDT::datetime_datatype(), false), // window start - ColumnType::new(CDT::datetime_datatype(), false), // window end - ColumnType::new(CDT::uint64_datatype(), true), //sum(number) - ]) - .with_key(vec![1]) - .with_time_index(Some(0)) - .into_unnamed(), - ), - ), - mfp: MapFilterProject::new(3) - .map(vec![ - ScalarExpr::Column(2), - ScalarExpr::Column(3), - ScalarExpr::Column(0), - ScalarExpr::Column(1), - ]) - .unwrap() - .project(vec![4, 5, 6]) - .unwrap(), - }, - }; - - let bundle = ctx.render_plan(expected).unwrap(); - - let output = get_output_handle(&mut ctx, bundle); - drop(ctx); - let expected = BTreeMap::from([( - 1, - vec![ - ( - Row::new(vec![ - 3u64.into(), - Timestamp::new_millisecond(START + 1000).into(), - Timestamp::new_millisecond(START + 2000).into(), - ]), - 1, - 1, - ), - ( - Row::new(vec![ - 4u64.into(), - Timestamp::new_millisecond(START + 2000).into(), - Timestamp::new_millisecond(START + 3000).into(), - ]), - 1, - 1, - ), - ( - Row::new(vec![ - 5u64.into(), - Timestamp::new_millisecond(START + 3000).into(), - Timestamp::new_millisecond(START + 4000).into(), - ]), - 1, - 1, - ), - ], - )]); - run_and_check(&mut state, &mut df, 1..2, expected, output); - } - /// select avg(number) from number; #[test] fn test_avg_eval() { diff --git a/src/flow/src/expr/func.rs b/src/flow/src/expr/func.rs index 39b469207169..8714d2f1b2f7 100644 --- a/src/flow/src/expr/func.rs +++ b/src/flow/src/expr/func.rs @@ -47,11 +47,6 @@ use crate::repr::{self, value_to_internal_ts, Row}; pub enum UnmaterializableFunc { Now, CurrentSchema, - TumbleWindow { - ts: Box, - window_size: common_time::Interval, - start_time: Option, - }, } impl UnmaterializableFunc { @@ -68,58 +63,18 @@ impl UnmaterializableFunc { output: ConcreteDataType::string_datatype(), generic_fn: GenericFn::CurrentSchema, }, - Self::TumbleWindow { .. } => Signature { - input: smallvec![ConcreteDataType::timestamp_millisecond_datatype()], - output: ConcreteDataType::timestamp_millisecond_datatype(), - generic_fn: GenericFn::TumbleWindow, - }, } } pub fn is_valid_func_name(name: &str) -> bool { - matches!( - name.to_lowercase().as_str(), - "now" | "current_schema" | "tumble" - ) + matches!(name.to_lowercase().as_str(), "now" | "current_schema") } /// Create a UnmaterializableFunc from a string of the function name - pub fn from_str_args(name: &str, args: Vec) -> Result { + pub fn from_str_args(name: &str, _args: Vec) -> Result { match name.to_lowercase().as_str() { "now" => Ok(Self::Now), "current_schema" => Ok(Self::CurrentSchema), - "tumble" => { - let ts = args.first().context(InvalidQuerySnafu { - reason: "Tumble window function requires a timestamp argument", - })?; - let window_size = args - .get(1) - .and_then(|expr| expr.expr.as_literal()) - .context(InvalidQuerySnafu { - reason: "Tumble window function requires a window size argument" - })?.as_string() // TODO(discord9): since df to substrait convertor does not support interval type yet, we need to take a string and cast it to interval instead - .map(|s|cast(Value::from(s), &ConcreteDataType::interval_month_day_nano_datatype())).transpose().map_err(BoxedError::new).context( - ExternalSnafu - )?.and_then(|v|v.as_interval()) - .with_context(||InvalidQuerySnafu { - reason: format!("Tumble window function requires window size argument to be a string describe a interval, found {:?}", args.get(1)) - })?; - let start_time = match args.get(2) { - Some(start_time) => start_time.expr.as_literal(), - None => None, - } - .map(|s| cast(s.clone(), &ConcreteDataType::datetime_datatype())).transpose().map_err(BoxedError::new).context(ExternalSnafu)?.map(|v|v.as_datetime().with_context( - ||InvalidQuerySnafu { - reason: format!("Tumble window function requires start time argument to be a datetime describe in string, found {:?}", args.get(2)) - } - )).transpose()?; - - Ok(Self::TumbleWindow { - ts: Box::new(ts.clone()), - window_size, - start_time, - }) - } _ => InvalidQuerySnafu { reason: format!("Unknown unmaterializable function: {}", name), } @@ -138,14 +93,6 @@ pub enum UnaryFunc { IsFalse, StepTimestamp, Cast(ConcreteDataType), - TumbleWindowFloor { - window_size: common_time::Interval, - start_time: Option, - }, - TumbleWindowCeiling { - window_size: common_time::Interval, - start_time: Option, - }, } impl UnaryFunc { @@ -177,16 +124,6 @@ impl UnaryFunc { output: to.clone(), generic_fn: GenericFn::Cast, }, - Self::TumbleWindowFloor { .. } => Signature { - input: smallvec![ConcreteDataType::timestamp_millisecond_datatype()], - output: ConcreteDataType::timestamp_millisecond_datatype(), - generic_fn: GenericFn::TumbleWindow, - }, - Self::TumbleWindowCeiling { .. } => Signature { - input: smallvec![ConcreteDataType::timestamp_millisecond_datatype()], - output: ConcreteDataType::timestamp_millisecond_datatype(), - generic_fn: GenericFn::TumbleWindow, - }, } } @@ -287,31 +224,6 @@ impl UnaryFunc { debug!("Cast to type: {to:?}, result: {:?}", res); res } - Self::TumbleWindowFloor { - window_size, - start_time, - } => { - let ts = get_ts_as_millisecond(arg)?; - let start_time = start_time.map(|t| t.val()); - let window_size = (window_size.to_nanosecond() / 1_000_000) as repr::Duration; // nanosecond to millisecond - let window_start = get_window_start(ts, window_size, start_time); - - let ret = Timestamp::new_millisecond(window_start); - Ok(Value::from(ret)) - } - Self::TumbleWindowCeiling { - window_size, - start_time, - } => { - let ts = get_ts_as_millisecond(arg)?; - let start_time = start_time.map(|t| t.val()); - let window_size = (window_size.to_nanosecond() / 1_000_000) as repr::Duration; // nanosecond to millisecond - let window_start = get_window_start(ts, window_size, start_time); - - let window_end = window_start + window_size; - let ret = Timestamp::new_millisecond(window_end); - Ok(Value::from(ret)) - } } } } diff --git a/src/flow/src/expr/scalar.rs b/src/flow/src/expr/scalar.rs index 42ed61d4f579..46c91020265d 100644 --- a/src/flow/src/expr/scalar.rs +++ b/src/flow/src/expr/scalar.rs @@ -58,77 +58,6 @@ impl TypedExpr { } } -impl TypedExpr { - /// expand multi-value expression to multiple expressions with new indices - /// - /// Currently it just mean expand `TumbleWindow` to `TumbleWindowFloor` and `TumbleWindowCeiling` - /// - /// TODO(discord9): test if nested reduce combine with df scalar function would cause problem - pub fn expand_multi_value( - input_typ: &RelationType, - exprs: &[TypedExpr], - ) -> Result, Error> { - // old indices in mfp, expanded expr - let mut ret = vec![]; - let input_arity = input_typ.column_types.len(); - for (old_idx, expr) in exprs.iter().enumerate() { - if let ScalarExpr::CallUnmaterializable(UnmaterializableFunc::TumbleWindow { - ts, - window_size, - start_time, - }) = &expr.expr - { - let floor = UnaryFunc::TumbleWindowFloor { - window_size: *window_size, - start_time: *start_time, - }; - let ceil = UnaryFunc::TumbleWindowCeiling { - window_size: *window_size, - start_time: *start_time, - }; - let floor = ScalarExpr::CallUnary { - func: floor, - expr: Box::new(ts.expr.clone()), - } - .with_type(ts.typ.clone()); - ret.push((None, floor)); - - let ceil = ScalarExpr::CallUnary { - func: ceil, - expr: Box::new(ts.expr.clone()), - } - .with_type(ts.typ.clone()); - ret.push((None, ceil)); - } else { - ret.push((Some(input_arity + old_idx), expr.clone())) - } - } - - // get shuffled index(old_idx -> new_idx) - // note index is offset by input_arity because mfp is designed to be first include input columns then intermediate columns - let shuffle = ret - .iter() - .map(|(old_idx, _)| *old_idx) // [Option] - .enumerate() - .map(|(new, old)| (old, new + input_arity)) - .flat_map(|(old, new)| old.map(|o| (o, new))) - .chain((0..input_arity).map(|i| (i, i))) // also remember to chain the input columns as not changed - .collect::>(); - - // shuffle expr's index - let exprs = ret - .into_iter() - .map(|(_, mut expr)| { - // invariant: it is expect that no expr will try to refer the column being expanded - expr.expr.permute_map(&shuffle)?; - Ok(expr) - }) - .collect::, _>>()?; - - Ok(exprs) - } -} - /// A scalar expression, which can be evaluated to a value. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub enum ScalarExpr { diff --git a/src/flow/src/expr/signature.rs b/src/flow/src/expr/signature.rs index d61a60dea5e2..a7615502a520 100644 --- a/src/flow/src/expr/signature.rs +++ b/src/flow/src/expr/signature.rs @@ -64,5 +64,4 @@ pub enum GenericFn { // unmaterized func Now, CurrentSchema, - TumbleWindow, } diff --git a/src/flow/src/transform.rs b/src/flow/src/transform.rs index ab3fdd87c001..6be074be3e38 100644 --- a/src/flow/src/transform.rs +++ b/src/flow/src/transform.rs @@ -146,46 +146,8 @@ pub async fn sql_to_flow_plan( } /// register flow-specific functions to the query engine -pub fn register_function_to_query_engine(engine: &Arc) { - engine.register_function(Arc::new(TumbleFunction {})); -} - -#[derive(Debug)] -pub struct TumbleFunction {} - -const TUMBLE_NAME: &str = "tumble"; - -impl std::fmt::Display for TumbleFunction { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "{}", TUMBLE_NAME.to_ascii_uppercase()) - } -} - -impl common_function::function::Function for TumbleFunction { - fn name(&self) -> &str { - TUMBLE_NAME - } - - fn return_type(&self, _input_types: &[CDT]) -> common_query::error::Result { - Ok(CDT::datetime_datatype()) - } - - fn signature(&self) -> common_query::prelude::Signature { - common_query::prelude::Signature::variadic_any(common_query::prelude::Volatility::Immutable) - } - - fn eval( - &self, - _func_ctx: common_function::function::FunctionContext, - _columns: &[datatypes::prelude::VectorRef], - ) -> common_query::error::Result { - UnexpectedSnafu { - reason: "Tumbler function is not implemented for datafusion executor", - } - .fail() - .map_err(BoxedError::new) - .context(common_query::error::ExecuteSnafu) - } +pub fn register_function_to_query_engine(_engine: &Arc) { + // TODO(discord9): add custom functions here } #[cfg(test)] @@ -295,7 +257,6 @@ mod test { let factory = query::QueryEngineFactory::new(catalog_list, None, None, None, false); let engine = factory.query_engine(); - engine.register_function(Arc::new(TumbleFunction {})); assert_eq!("datafusion", engine.name()); engine diff --git a/src/flow/src/transform/aggr.rs b/src/flow/src/transform/aggr.rs index 64ecc3eec506..fe5154b61725 100644 --- a/src/flow/src/transform/aggr.rs +++ b/src/flow/src/transform/aggr.rs @@ -291,20 +291,6 @@ impl KeyValPlan { } } -/// find out the column that should be time index in group exprs(which is all columns that should be keys) -/// TODO(discord9): better ways to assign time index -fn find_time_index_in_group_exprs(group_exprs: &[TypedExpr]) -> Option { - group_exprs.iter().position(|expr| { - matches!( - &expr.expr, - ScalarExpr::CallUnary { - func: UnaryFunc::TumbleWindowFloor { .. }, - expr: _ - } - ) - }) -} - impl TypedPlan { /// Convert AggregateRel into Flow's TypedPlan /// @@ -323,19 +309,11 @@ impl TypedPlan { return not_impl_err!("Aggregate without an input is not supported"); }; - let group_exprs = { - let group_exprs = TypedExpr::from_substrait_agg_grouping( - ctx, - &agg.groupings, - &input.schema, - extensions, - ) - .await?; - - TypedExpr::expand_multi_value(&input.schema.typ, &group_exprs)? - }; + let group_exprs = + TypedExpr::from_substrait_agg_grouping(ctx, &agg.groupings, &input.schema, extensions) + .await?; - let time_index = find_time_index_in_group_exprs(&group_exprs); + let time_index = None; let (mut aggr_exprs, post_mfp) = AggregateExpr::from_substrait_agg_measures( ctx, @@ -356,23 +334,10 @@ impl TypedPlan { let mut output_types = Vec::new(); // give best effort to get column name let mut output_names = Vec::new(); - // mark all auto added cols - let mut auto_cols = vec![]; // first append group_expr as key, then aggr_expr as value - for (idx, expr) in group_exprs.iter().enumerate() { + for expr in group_exprs.iter() { output_types.push(expr.typ.clone()); let col_name = match &expr.expr { - ScalarExpr::CallUnary { - func: UnaryFunc::TumbleWindowFloor { .. }, - .. - } => Some("window_start".to_string()), - ScalarExpr::CallUnary { - func: UnaryFunc::TumbleWindowCeiling { .. }, - .. - } => { - auto_cols.push(idx); - Some("window_end".to_string()) - } ScalarExpr::Column(col) => input.schema.get_name(*col).clone(), _ => None, }; @@ -393,7 +358,6 @@ impl TypedPlan { RelationType::new(output_types).with_key((0..group_exprs.len()).collect_vec()) } .with_time_index(time_index) - .with_autos(&auto_cols) .into_named(output_names) }; @@ -479,23 +443,11 @@ mod test { use crate::plan::{Plan, TypedPlan}; use crate::repr::{self, ColumnType, RelationType}; use crate::transform::test::{create_test_ctx, create_test_query_engine, sql_to_substrait}; - /// TODO(discord9): add more illegal sql tests - #[tokio::test] - async fn test_missing_key_check() { - let engine = create_test_query_engine(); - let sql = "SELECT avg(number) FROM numbers_with_ts GROUP BY tumble(ts, '1 hour'), number"; - let plan = sql_to_substrait(engine.clone(), sql).await; - - let mut ctx = create_test_ctx(); - assert!(TypedPlan::from_substrait_plan(&mut ctx, &plan) - .await - .is_err()); - } #[tokio::test] async fn test_df_func_basic() { let engine = create_test_query_engine(); - let sql = "SELECT sum(abs(number)) FROM numbers_with_ts GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');"; + let sql = "SELECT ts, sum(abs(number)) FROM numbers_with_ts GROUP BY ts;"; let plan = sql_to_substrait(engine.clone(), sql).await; let mut ctx = create_test_ctx(); @@ -510,18 +462,11 @@ mod test { }; let expected = TypedPlan { schema: RelationType::new(vec![ - ColumnType::new(CDT::uint64_datatype(), true), // sum(number) - ColumnType::new(CDT::datetime_datatype(), false), // window start - ColumnType::new(CDT::datetime_datatype(), false), // window end + ColumnType::new(CDT::datetime_datatype(), false), // ts + ColumnType::new(CDT::uint64_datatype(), true), // sum(abs(number)) ]) - .with_key(vec![2]) - .with_time_index(Some(1)) - .with_autos(&[2]) - .into_named(vec![ - None, - Some("window_start".to_string()), - Some("window_end".to_string()), - ]), + .with_key(vec![0]) + .into_named(vec![Some("ts".to_string()), None]), plan: Plan::Mfp { input: Box::new( Plan::Reduce { @@ -542,30 +487,9 @@ mod test { ), key_val_plan: KeyValPlan { key_plan: MapFilterProject::new(2) - .map(vec![ - ScalarExpr::Column(1).call_unary( - UnaryFunc::TumbleWindowFloor { - window_size: Interval::from_month_day_nano( - 0, - 0, - 1_000_000_000, - ), - start_time: Some(DateTime::new(1625097600000)), - }, - ), - ScalarExpr::Column(1).call_unary( - UnaryFunc::TumbleWindowCeiling { - window_size: Interval::from_month_day_nano( - 0, - 0, - 1_000_000_000, - ), - start_time: Some(DateTime::new(1625097600000)), - }, - ), - ]) + .map(vec![ScalarExpr::Column(1)]) .unwrap() - .project(vec![2, 3]) + .project(vec![2]) .unwrap() .into_safe(), val_plan: MapFilterProject::new(2) @@ -573,8 +497,7 @@ mod test { df_scalar_fn: DfScalarFunction::try_from_raw_fn( RawDfScalarFn { f: BytesMut::from( - b"\x08\x01\"\x08\x1a\x06\x12\x04\n\x02\x12\0" - .as_ref(), + b"\"\x08\x1a\x06\x12\x04\n\x02\x12\0".as_ref(), ), input_schema: RelationType::new(vec![ColumnType::new( ConcreteDataType::uint32_datatype(), @@ -583,9 +506,8 @@ mod test { .into_unnamed(), extensions: FunctionExtensions { anchor_to_name: BTreeMap::from([ - (0, "tumble".to_string()), - (1, "abs".to_string()), - (2, "sum".to_string()), + (0, "abs".to_string()), + (1, "sum".to_string()), ]), }, }, @@ -607,39 +529,31 @@ mod test { } .with_types( RelationType::new(vec![ - ColumnType::new(CDT::datetime_datatype(), false), // window start - ColumnType::new(CDT::datetime_datatype(), false), // window end - ColumnType::new(CDT::uint64_datatype(), true), //sum(number) + ColumnType::new(CDT::datetime_datatype(), false), // ts + ColumnType::new(CDT::uint64_datatype(), true), // sum(..) ]) - .with_key(vec![1]) - .with_time_index(Some(0)) - .with_autos(&[1]) - .into_named(vec![ - Some("window_start".to_string()), - Some("window_end".to_string()), - None, - ]), + .with_key(vec![0]) + .into_named(vec![Some("ts".to_string()), None]), ), ), - mfp: MapFilterProject::new(3) + mfp: MapFilterProject::new(2) .map(vec![ + ScalarExpr::Column(1), // ts + ScalarExpr::Column(0), // sum(..) ScalarExpr::Column(2), - ScalarExpr::Column(3), - ScalarExpr::Column(0), - ScalarExpr::Column(1), ]) .unwrap() - .project(vec![4, 5, 6]) + .project(vec![3, 4]) .unwrap(), }, }; - assert_eq!(expected, flow_plan); + assert_eq!(flow_plan, expected); } #[tokio::test] async fn test_df_func_expr_tree() { let engine = create_test_query_engine(); - let sql = "SELECT abs(sum(number)) FROM numbers_with_ts GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');"; + let sql = "SELECT ts, abs(sum(number)) FROM numbers_with_ts GROUP BY ts;"; let plan = sql_to_substrait(engine.clone(), sql).await; let mut ctx = create_test_ctx(); @@ -652,20 +566,14 @@ mod test { expr: ScalarExpr::Column(0), distinct: false, }; + let expected = TypedPlan { schema: RelationType::new(vec![ - ColumnType::new(CDT::uint64_datatype(), true), // sum(number) - ColumnType::new(CDT::datetime_datatype(), false), // window start - ColumnType::new(CDT::datetime_datatype(), false), // window end + ColumnType::new(CDT::datetime_datatype(), false), // ts + ColumnType::new(CDT::uint64_datatype(), true), // sum(number) ]) - .with_key(vec![2]) - .with_time_index(Some(1)) - .with_autos(&[2]) - .into_named(vec![ - None, - Some("window_start".to_string()), - Some("window_end".to_string()), - ]), + .with_key(vec![0]) + .into_named(vec![Some("ts".to_string()), None]), plan: Plan::Mfp { input: Box::new( Plan::Reduce { @@ -686,30 +594,9 @@ mod test { ), key_val_plan: KeyValPlan { key_plan: MapFilterProject::new(2) - .map(vec![ - ScalarExpr::Column(1).call_unary( - UnaryFunc::TumbleWindowFloor { - window_size: Interval::from_month_day_nano( - 0, - 0, - 1_000_000_000, - ), - start_time: Some(DateTime::new(1625097600000)), - }, - ), - ScalarExpr::Column(1).call_unary( - UnaryFunc::TumbleWindowCeiling { - window_size: Interval::from_month_day_nano( - 0, - 0, - 1_000_000_000, - ), - start_time: Some(DateTime::new(1625097600000)), - }, - ), - ]) + .map(vec![ScalarExpr::Column(1)]) .unwrap() - .project(vec![2, 3]) + .project(vec![2]) .unwrap() .into_safe(), val_plan: MapFilterProject::new(2) @@ -725,23 +612,17 @@ mod test { } .with_types( RelationType::new(vec![ - ColumnType::new(CDT::datetime_datatype(), false), // window start - ColumnType::new(CDT::datetime_datatype(), false), // window end - ColumnType::new(CDT::uint64_datatype(), true), //sum(number) + ColumnType::new(CDT::datetime_datatype(), false), // ts + ColumnType::new(CDT::uint64_datatype(), true), // sum(number) ]) - .with_key(vec![1]) - .with_time_index(Some(0)) - .with_autos(&[1]) - .into_named(vec![ - Some("window_start".to_string()), - Some("window_end".to_string()), - None, - ]), + .with_key(vec![0]) + .into_named(vec![Some("ts".to_string()), None]), ), ), - mfp: MapFilterProject::new(3) + mfp: MapFilterProject::new(2) .map(vec![ - ScalarExpr::Column(2), + ScalarExpr::Column(1), + ScalarExpr::Column(0), ScalarExpr::CallDf { df_scalar_fn: DfScalarFunction::try_from_raw_fn(RawDfScalarFn { f: BytesMut::from(b"\"\x08\x1a\x06\x12\x04\n\x02\x12\0".as_ref()), @@ -753,407 +634,17 @@ mod test { extensions: FunctionExtensions { anchor_to_name: BTreeMap::from([ (0, "abs".to_string()), - (1, "tumble".to_string()), - (2, "sum".to_string()), + (1, "sum".to_string()), ]), }, }) .await .unwrap(), - exprs: vec![ScalarExpr::Column(3)], + exprs: vec![ScalarExpr::Column(2)], }, - ScalarExpr::Column(0), - ScalarExpr::Column(1), ]) .unwrap() - .project(vec![4, 5, 6]) - .unwrap(), - }, - }; - assert_eq!(expected, flow_plan); - } - - /// TODO(discord9): add more illegal sql tests - #[tokio::test] - async fn test_tumble_composite() { - let engine = create_test_query_engine(); - let sql = - "SELECT number, avg(number) FROM numbers_with_ts GROUP BY tumble(ts, '1 hour'), number"; - let plan = sql_to_substrait(engine.clone(), sql).await; - - let mut ctx = create_test_ctx(); - let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan) - .await - .unwrap(); - - let aggr_exprs = vec![ - AggregateExpr { - func: AggregateFunc::SumUInt32, - expr: ScalarExpr::Column(0), - distinct: false, - }, - AggregateExpr { - func: AggregateFunc::Count, - expr: ScalarExpr::Column(0), - distinct: false, - }, - ]; - let avg_expr = ScalarExpr::If { - cond: Box::new(ScalarExpr::Column(4).call_binary( - ScalarExpr::Literal(Value::from(0i64), CDT::int64_datatype()), - BinaryFunc::NotEq, - )), - then: Box::new(ScalarExpr::Column(3).call_binary( - ScalarExpr::Column(4).call_unary(UnaryFunc::Cast(CDT::uint64_datatype())), - BinaryFunc::DivUInt64, - )), - els: Box::new(ScalarExpr::Literal(Value::Null, CDT::uint64_datatype())), - }; - let expected = TypedPlan { - plan: Plan::Mfp { - input: Box::new( - Plan::Reduce { - input: Box::new( - Plan::Get { - id: crate::expr::Id::Global(GlobalId::User(1)), - } - .with_types( - RelationType::new(vec![ - ColumnType::new(ConcreteDataType::uint32_datatype(), false), - ColumnType::new(ConcreteDataType::datetime_datatype(), false), - ]) - .into_named(vec![ - Some("number".to_string()), - Some("ts".to_string()), - ]), - ), - ), - key_val_plan: KeyValPlan { - key_plan: MapFilterProject::new(2) - .map(vec![ - ScalarExpr::Column(1).call_unary( - UnaryFunc::TumbleWindowFloor { - window_size: Interval::from_month_day_nano( - 0, - 0, - 3_600_000_000_000, - ), - start_time: None, - }, - ), - ScalarExpr::Column(1).call_unary( - UnaryFunc::TumbleWindowCeiling { - window_size: Interval::from_month_day_nano( - 0, - 0, - 3_600_000_000_000, - ), - start_time: None, - }, - ), - ScalarExpr::Column(0), - ]) - .unwrap() - .project(vec![2, 3, 4]) - .unwrap() - .into_safe(), - val_plan: MapFilterProject::new(2) - .project(vec![0, 1]) - .unwrap() - .into_safe(), - }, - reduce_plan: ReducePlan::Accumulable(AccumulablePlan { - full_aggrs: aggr_exprs.clone(), - simple_aggrs: vec![ - AggrWithIndex::new(aggr_exprs[0].clone(), 0, 0), - AggrWithIndex::new(aggr_exprs[1].clone(), 0, 1), - ], - distinct_aggrs: vec![], - }), - } - .with_types( - RelationType::new(vec![ - // keys - ColumnType::new(CDT::datetime_datatype(), false), // window start(time index) - ColumnType::new(CDT::datetime_datatype(), false), // window end(pk) - ColumnType::new(CDT::uint32_datatype(), false), // number(pk) - // values - ColumnType::new(CDT::uint64_datatype(), true), // avg.sum(number) - ColumnType::new(CDT::int64_datatype(), true), // avg.count(number) - ]) - .with_key(vec![1, 2]) - .with_time_index(Some(0)) - .with_autos(&[1]) - .into_named(vec![ - Some("window_start".to_string()), - Some("window_end".to_string()), - Some("number".to_string()), - None, - None, - ]), - ), - ), - mfp: MapFilterProject::new(5) - .map(vec![ - avg_expr, - ScalarExpr::Column(2), // number(pk) - ScalarExpr::Column(5), // avg.sum(number) - ScalarExpr::Column(0), // window start - ScalarExpr::Column(1), // window end - ]) - .unwrap() - .project(vec![6, 7, 8, 9]) - .unwrap(), - }, - schema: RelationType::new(vec![ - ColumnType::new(CDT::uint32_datatype(), false), // number - ColumnType::new(CDT::uint64_datatype(), true), // avg(number) - ColumnType::new(CDT::datetime_datatype(), false), // window start - ColumnType::new(CDT::datetime_datatype(), false), // window end - ]) - .with_key(vec![0, 3]) - .with_time_index(Some(2)) - .with_autos(&[3]) - .into_named(vec![ - Some("number".to_string()), - None, - Some("window_start".to_string()), - Some("window_end".to_string()), - ]), - }; - assert_eq!(flow_plan, expected); - } - - #[tokio::test] - async fn test_tumble_parse_optional() { - let engine = create_test_query_engine(); - let sql = "SELECT sum(number) FROM numbers_with_ts GROUP BY tumble(ts, '1 hour')"; - let plan = sql_to_substrait(engine.clone(), sql).await; - - let mut ctx = create_test_ctx(); - let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan) - .await - .unwrap(); - - let aggr_expr = AggregateExpr { - func: AggregateFunc::SumUInt32, - expr: ScalarExpr::Column(0), - distinct: false, - }; - let expected = TypedPlan { - schema: RelationType::new(vec![ - ColumnType::new(CDT::uint64_datatype(), true), // sum(number) - ColumnType::new(CDT::datetime_datatype(), false), // window start - ColumnType::new(CDT::datetime_datatype(), false), // window end - ]) - .with_key(vec![2]) - .with_time_index(Some(1)) - .with_autos(&[2]) - .into_named(vec![ - None, - Some("window_start".to_string()), - Some("window_end".to_string()), - ]), - plan: Plan::Mfp { - input: Box::new( - Plan::Reduce { - input: Box::new( - Plan::Get { - id: crate::expr::Id::Global(GlobalId::User(1)), - } - .with_types( - RelationType::new(vec![ - ColumnType::new(ConcreteDataType::uint32_datatype(), false), - ColumnType::new(ConcreteDataType::datetime_datatype(), false), - ]) - .into_named(vec![ - Some("number".to_string()), - Some("ts".to_string()), - ]), - ), - ), - key_val_plan: KeyValPlan { - key_plan: MapFilterProject::new(2) - .map(vec![ - ScalarExpr::Column(1).call_unary( - UnaryFunc::TumbleWindowFloor { - window_size: Interval::from_month_day_nano( - 0, - 0, - 3_600_000_000_000, - ), - start_time: None, - }, - ), - ScalarExpr::Column(1).call_unary( - UnaryFunc::TumbleWindowCeiling { - window_size: Interval::from_month_day_nano( - 0, - 0, - 3_600_000_000_000, - ), - start_time: None, - }, - ), - ]) - .unwrap() - .project(vec![2, 3]) - .unwrap() - .into_safe(), - val_plan: MapFilterProject::new(2) - .project(vec![0, 1]) - .unwrap() - .into_safe(), - }, - reduce_plan: ReducePlan::Accumulable(AccumulablePlan { - full_aggrs: vec![aggr_expr.clone()], - simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)], - distinct_aggrs: vec![], - }), - } - .with_types( - RelationType::new(vec![ - ColumnType::new(CDT::datetime_datatype(), false), // window start - ColumnType::new(CDT::datetime_datatype(), false), // window end - ColumnType::new(CDT::uint64_datatype(), true), //sum(number) - ]) - .with_key(vec![1]) - .with_time_index(Some(0)) - .with_autos(&[1]) - .into_named(vec![ - Some("window_start".to_string()), - Some("window_end".to_string()), - None, - ]), - ), - ), - mfp: MapFilterProject::new(3) - .map(vec![ - ScalarExpr::Column(2), - ScalarExpr::Column(3), - ScalarExpr::Column(0), - ScalarExpr::Column(1), - ]) - .unwrap() - .project(vec![4, 5, 6]) - .unwrap(), - }, - }; - assert_eq!(flow_plan, expected); - } - - #[tokio::test] - async fn test_tumble_parse() { - let engine = create_test_query_engine(); - let sql = "SELECT sum(number) FROM numbers_with_ts GROUP BY tumble(ts, '1 hour', '2021-07-01 00:00:00')"; - let plan = sql_to_substrait(engine.clone(), sql).await; - - let mut ctx = create_test_ctx(); - let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan) - .await - .unwrap(); - - let aggr_expr = AggregateExpr { - func: AggregateFunc::SumUInt32, - expr: ScalarExpr::Column(0), - distinct: false, - }; - let expected = TypedPlan { - schema: RelationType::new(vec![ - ColumnType::new(CDT::uint64_datatype(), true), // sum(number) - ColumnType::new(CDT::datetime_datatype(), false), // window start - ColumnType::new(CDT::datetime_datatype(), false), // window end - ]) - .with_key(vec![2]) - .with_time_index(Some(1)) - .with_autos(&[2]) - .into_named(vec![ - None, - Some("window_start".to_string()), - Some("window_end".to_string()), - ]), - plan: Plan::Mfp { - input: Box::new( - Plan::Reduce { - input: Box::new( - Plan::Get { - id: crate::expr::Id::Global(GlobalId::User(1)), - } - .with_types( - RelationType::new(vec![ - ColumnType::new(ConcreteDataType::uint32_datatype(), false), - ColumnType::new(ConcreteDataType::datetime_datatype(), false), - ]) - .into_named(vec![ - Some("number".to_string()), - Some("ts".to_string()), - ]), - ), - ), - key_val_plan: KeyValPlan { - key_plan: MapFilterProject::new(2) - .map(vec![ - ScalarExpr::Column(1).call_unary( - UnaryFunc::TumbleWindowFloor { - window_size: Interval::from_month_day_nano( - 0, - 0, - 3_600_000_000_000, - ), - start_time: Some(DateTime::new(1625097600000)), - }, - ), - ScalarExpr::Column(1).call_unary( - UnaryFunc::TumbleWindowCeiling { - window_size: Interval::from_month_day_nano( - 0, - 0, - 3_600_000_000_000, - ), - start_time: Some(DateTime::new(1625097600000)), - }, - ), - ]) - .unwrap() - .project(vec![2, 3]) - .unwrap() - .into_safe(), - val_plan: MapFilterProject::new(2) - .project(vec![0, 1]) - .unwrap() - .into_safe(), - }, - reduce_plan: ReducePlan::Accumulable(AccumulablePlan { - full_aggrs: vec![aggr_expr.clone()], - simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)], - distinct_aggrs: vec![], - }), - } - .with_types( - RelationType::new(vec![ - ColumnType::new(CDT::datetime_datatype(), false), // window start - ColumnType::new(CDT::datetime_datatype(), false), // window end - ColumnType::new(CDT::uint64_datatype(), true), //sum(number) - ]) - .with_key(vec![1]) - .with_time_index(Some(0)) - .with_autos(&[1]) - .into_named(vec![ - Some("window_start".to_string()), - Some("window_end".to_string()), - None, - ]), - ), - ), - mfp: MapFilterProject::new(3) - .map(vec![ - ScalarExpr::Column(2), - ScalarExpr::Column(3), - ScalarExpr::Column(0), - ScalarExpr::Column(1), - ]) - .unwrap() - .project(vec![4, 5, 6]) + .project(vec![3, 4]) .unwrap(), }, }; diff --git a/src/flow/src/transform/expr.rs b/src/flow/src/transform/expr.rs index b2784b08bcc1..20bc78d2861d 100644 --- a/src/flow/src/transform/expr.rs +++ b/src/flow/src/transform/expr.rs @@ -459,7 +459,6 @@ impl TypedExpr { #[cfg(test)] mod test { - use common_time::{DateTime, Interval}; use datatypes::prelude::ConcreteDataType; use datatypes::value::Value; use pretty_assertions::assert_eq; @@ -737,65 +736,5 @@ mod test { }, } ); - - let f = substrait_proto::proto::expression::ScalarFunction { - function_reference: 0, - arguments: vec![proto_col(0), lit("1 second"), lit("2021-07-01 00:00:00")], - options: vec![], - output_type: None, - ..Default::default() - }; - let input_schema = RelationType::new(vec![ - ColumnType::new(CDT::timestamp_nanosecond_datatype(), false), - ColumnType::new(CDT::string_datatype(), false), - ]) - .into_unnamed(); - let extensions = FunctionExtensions::from_iter(vec![(0, "tumble".to_string())]); - let res = TypedExpr::from_substrait_scalar_func(&f, &input_schema, &extensions) - .await - .unwrap(); - - assert_eq!( - res, - ScalarExpr::CallUnmaterializable(UnmaterializableFunc::TumbleWindow { - ts: Box::new( - ScalarExpr::Column(0) - .with_type(ColumnType::new(CDT::timestamp_nanosecond_datatype(), false)) - ), - window_size: Interval::from_month_day_nano(0, 0, 1_000_000_000), - start_time: Some(DateTime::new(1625097600000)) - }) - .with_type(ColumnType::new(CDT::timestamp_millisecond_datatype(), true)), - ); - - let f = substrait_proto::proto::expression::ScalarFunction { - function_reference: 0, - arguments: vec![proto_col(0), lit("1 second")], - options: vec![], - output_type: None, - ..Default::default() - }; - let input_schema = RelationType::new(vec![ - ColumnType::new(CDT::timestamp_nanosecond_datatype(), false), - ColumnType::new(CDT::string_datatype(), false), - ]) - .into_unnamed(); - let extensions = FunctionExtensions::from_iter(vec![(0, "tumble".to_string())]); - let res = TypedExpr::from_substrait_scalar_func(&f, &input_schema, &extensions) - .await - .unwrap(); - - assert_eq!( - res, - ScalarExpr::CallUnmaterializable(UnmaterializableFunc::TumbleWindow { - ts: Box::new( - ScalarExpr::Column(0) - .with_type(ColumnType::new(CDT::timestamp_nanosecond_datatype(), false)) - ), - window_size: Interval::from_month_day_nano(0, 0, 1_000_000_000), - start_time: None - }) - .with_type(ColumnType::new(CDT::timestamp_millisecond_datatype(), true)), - ) } } diff --git a/src/flow/src/transform/plan.rs b/src/flow/src/transform/plan.rs index 200226fb352a..35441c925ab7 100644 --- a/src/flow/src/transform/plan.rs +++ b/src/flow/src/transform/plan.rs @@ -76,25 +76,9 @@ impl TypedPlan { return not_impl_err!("Projection without an input is not supported"); }; - // because this `input.schema` is incorrect for pre-expand substrait plan, so we have to use schema before expand multi-value - // function to correctly transform it, and late rewrite it - let schema_before_expand = { - let input_schema = input.schema.clone(); - let auto_columns: HashSet = - HashSet::from_iter(input_schema.typ().auto_columns.clone()); - let not_auto_added_columns = (0..input_schema.len()?) - .filter(|i| !auto_columns.contains(i)) - .collect_vec(); - let mfp = MapFilterProject::new(input_schema.len()?) - .project(not_auto_added_columns)? - .into_safe(); - - input_schema.apply_mfp(&mfp)? - }; - let mut exprs: Vec = Vec::with_capacity(p.expressions.len()); for e in &p.expressions { - let expr = TypedExpr::from_substrait_rex(e, &schema_before_expand, extensions).await?; + let expr = TypedExpr::from_substrait_rex(e, &input.schema, extensions).await?; exprs.push(expr); } let is_literal = exprs.iter().all(|expr| expr.expr.is_literal()); @@ -117,17 +101,6 @@ impl TypedPlan { plan, }) } else { - match input.plan.clone() { - Plan::Reduce { key_val_plan, .. } => { - rewrite_projection_after_reduce(key_val_plan, &input.schema, &mut exprs)?; - } - Plan::Mfp { input, mfp: _ } => { - if let Plan::Reduce { key_val_plan, .. } = input.plan { - rewrite_projection_after_reduce(key_val_plan, &input.schema, &mut exprs)?; - } - } - _ => (), - } input.projection(exprs) } } @@ -235,113 +208,6 @@ impl TypedPlan { } } -/// if reduce_plan contains the special function like tumble floor/ceiling, add them to the proj_exprs -/// so the effect is the window_start, window_end column are auto added to output rows -/// -/// This is to fix a problem that we have certain functions that return two values, but since substrait doesn't know that, it will assume it return one value -/// this function fix that and rewrite `proj_exprs` to correct form -fn rewrite_projection_after_reduce( - key_val_plan: KeyValPlan, - reduce_output_type: &RelationDesc, - proj_exprs: &mut Vec, -) -> Result<(), Error> { - // TODO(discord9): get keys correctly - let key_exprs = key_val_plan - .key_plan - .projection - .clone() - .into_iter() - .map(|i| { - if i < key_val_plan.key_plan.input_arity { - ScalarExpr::Column(i) - } else { - key_val_plan.key_plan.expressions[i - key_val_plan.key_plan.input_arity].clone() - } - }) - .collect_vec(); - let mut shift_offset = 0; - let mut shuffle: BTreeMap = BTreeMap::new(); - let special_keys = key_exprs - .clone() - .into_iter() - .enumerate() - .filter(|(idx, p)| { - shuffle.insert(*idx, *idx + shift_offset); - if matches!( - p, - ScalarExpr::CallUnary { - func: UnaryFunc::TumbleWindowFloor { .. }, - .. - } | ScalarExpr::CallUnary { - func: UnaryFunc::TumbleWindowCeiling { .. }, - .. - } - ) { - if matches!( - p, - ScalarExpr::CallUnary { - func: UnaryFunc::TumbleWindowFloor { .. }, - .. - } - ) { - shift_offset += 1; - } - true - } else { - false - } - }) - .collect_vec(); - let spec_key_arity = special_keys.len(); - if spec_key_arity == 0 { - return Ok(()); - } - - // shuffle proj_exprs - // because substrait use offset while assume `tumble` only return one value - for proj_expr in proj_exprs.iter_mut() { - proj_expr.expr.permute_map(&shuffle)?; - } // add key to the end - for (key_idx, _key_expr) in special_keys { - // here we assume the output type of reduce operator(`reduce_output_type`) is just first keys columns, then append value columns - // so we can use `key_idx` to index `reduce_output_type` and get the keys we need to append to `proj_exprs` - proj_exprs.push( - ScalarExpr::Column(key_idx) - .with_type(reduce_output_type.typ().column_types[key_idx].clone()), - ); - } - - // check if normal expr in group exprs are all in proj_exprs - let all_cols_ref_in_proj: BTreeSet = proj_exprs - .iter() - .filter_map(|e| { - if let ScalarExpr::Column(i) = &e.expr { - Some(*i) - } else { - None - } - }) - .collect(); - for (key_idx, key_expr) in key_exprs.iter().enumerate() { - if let ScalarExpr::Column(_) = key_expr { - if !all_cols_ref_in_proj.contains(&key_idx) { - let err_msg = format!( - "Expect normal column in group by also appear in projection, but column {}(name is {}) is missing", - key_idx, - reduce_output_type - .get_name(key_idx) - .clone() - .map(|s|format!("'{}'",s)) - .unwrap_or("unknown".to_string()) - ); - return InvalidQuerySnafu { reason: err_msg }.fail(); - } - } - } - - Ok(()) -} - #[cfg(test)] mod test { use datatypes::prelude::ConcreteDataType; diff --git a/tests/cases/standalone/common/flow/basic.result b/tests/cases/standalone/common/flow/basic.result index 1d480e2f2277..f6fdec31c220 100644 --- a/tests/cases/standalone/common/flow/basic.result +++ b/tests/cases/standalone/common/flow/basic.result @@ -7,67 +7,6 @@ CREATE TABLE numbers_input ( Affected Rows: 0 -CREATE FLOW test_numbers -SINK TO out_num_cnt -AS -SELECT sum(number) FROM numbers_input GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); - -Affected Rows: 0 - -INSERT INTO numbers_input -VALUES - (20, "2021-07-01 00:00:00.200"), - (22, "2021-07-01 00:00:00.600"); - -Affected Rows: 2 - --- SQLNESS SLEEP 3s -SELECT col_0, window_start, window_end FROM out_num_cnt; - -+-------+---------------------+---------------------+ -| col_0 | window_start | window_end | -+-------+---------------------+---------------------+ -| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 | -+-------+---------------------+---------------------+ - -INSERT INTO numbers_input -VALUES - (23,"2021-07-01 00:00:01.000"), - (24,"2021-07-01 00:00:01.500"); - -Affected Rows: 2 - --- SQLNESS SLEEP 2s -SELECT col_0, window_start, window_end FROM out_num_cnt; - -+-------+---------------------+---------------------+ -| col_0 | window_start | window_end | -+-------+---------------------+---------------------+ -| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 | -| 47 | 2021-07-01T00:00:01 | 2021-07-01T00:00:02 | -+-------+---------------------+---------------------+ - -DROP FLOW test_numbers; - -Affected Rows: 0 - -DROP TABLE numbers_input; - -Affected Rows: 0 - -DROP TABLE out_num_cnt; - -Affected Rows: 0 - -CREATE TABLE numbers_input ( - number INT, - ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY(number), - TIME INDEX(ts) -); - -Affected Rows: 0 - CREATE FLOW test_numbers SINK TO out_num_cnt AS diff --git a/tests/cases/standalone/common/flow/basic.sql b/tests/cases/standalone/common/flow/basic.sql index 8c0c5d038ef6..54800b111b2a 100644 --- a/tests/cases/standalone/common/flow/basic.sql +++ b/tests/cases/standalone/common/flow/basic.sql @@ -5,38 +5,6 @@ CREATE TABLE numbers_input ( TIME INDEX(ts) ); -CREATE FLOW test_numbers -SINK TO out_num_cnt -AS -SELECT sum(number) FROM numbers_input GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); - -INSERT INTO numbers_input -VALUES - (20, "2021-07-01 00:00:00.200"), - (22, "2021-07-01 00:00:00.600"); - --- SQLNESS SLEEP 3s -SELECT col_0, window_start, window_end FROM out_num_cnt; - -INSERT INTO numbers_input -VALUES - (23,"2021-07-01 00:00:01.000"), - (24,"2021-07-01 00:00:01.500"); - --- SQLNESS SLEEP 2s -SELECT col_0, window_start, window_end FROM out_num_cnt; - -DROP FLOW test_numbers; -DROP TABLE numbers_input; -DROP TABLE out_num_cnt; - -CREATE TABLE numbers_input ( - number INT, - ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY(number), - TIME INDEX(ts) -); - CREATE FLOW test_numbers SINK TO out_num_cnt AS diff --git a/tests/cases/standalone/common/flow/df_func.result b/tests/cases/standalone/common/flow/df_func.result index 7ab393eeb10e..e69de29bb2d1 100644 --- a/tests/cases/standalone/common/flow/df_func.result +++ b/tests/cases/standalone/common/flow/df_func.result @@ -1,126 +0,0 @@ -CREATE TABLE numbers_input_df_func ( - number INT, - ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY(number), - TIME INDEX(ts) -); - -Affected Rows: 0 - --- call `sum(abs(number))` where `abs` is DataFusion Function and `sum` is flow function -CREATE FLOW test_numbers_df_func -SINK TO out_num_cnt_df_func -AS -SELECT sum(abs(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); - -Affected Rows: 0 - -INSERT INTO numbers_input_df_func -VALUES - (-20, "2021-07-01 00:00:00.200"), - (22, "2021-07-01 00:00:00.600"); - -Affected Rows: 2 - --- sleep a little bit longer to make sure that table is created and data is inserted --- SQLNESS SLEEP 3s -SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; - -+-------+---------------------+---------------------+ -| col_0 | window_start | window_end | -+-------+---------------------+---------------------+ -| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 | -+-------+---------------------+---------------------+ - -INSERT INTO numbers_input_df_func -VALUES - (23,"2021-07-01 00:00:01.000"), - (-24,"2021-07-01 00:00:01.500"); - -Affected Rows: 2 - --- SQLNESS SLEEP 2s -SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; - -+-------+---------------------+---------------------+ -| col_0 | window_start | window_end | -+-------+---------------------+---------------------+ -| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 | -| 47 | 2021-07-01T00:00:01 | 2021-07-01T00:00:02 | -+-------+---------------------+---------------------+ - -DROP FLOW test_numbers_df_func; - -Affected Rows: 0 - -DROP TABLE numbers_input_df_func; - -Affected Rows: 0 - -DROP TABLE out_num_cnt_df_func; - -Affected Rows: 0 - -CREATE TABLE numbers_input_df_func ( - number INT, - ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY(number), - TIME INDEX(ts) -); - -Affected Rows: 0 - --- call `abs(sum(number))`to make sure that calling `abs` function(impl by datafusion) on `sum` function(impl by flow) is working -CREATE FLOW test_numbers_df_func -SINK TO out_num_cnt_df_func -AS -SELECT abs(sum(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); - -Affected Rows: 0 - -INSERT INTO numbers_input_df_func -VALUES - (-20, "2021-07-01 00:00:00.200"), - (22, "2021-07-01 00:00:00.600"); - -Affected Rows: 2 - --- sleep a little bit longer to make sure that table is created and data is inserted --- SQLNESS SLEEP 3s -SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; - -+-------+---------------------+---------------------+ -| col_0 | window_start | window_end | -+-------+---------------------+---------------------+ -| 2 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 | -+-------+---------------------+---------------------+ - -INSERT INTO numbers_input_df_func -VALUES - (23,"2021-07-01 00:00:01.000"), - (-24,"2021-07-01 00:00:01.500"); - -Affected Rows: 2 - --- SQLNESS SLEEP 2s -SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; - -+-------+---------------------+---------------------+ -| col_0 | window_start | window_end | -+-------+---------------------+---------------------+ -| 2 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 | -| 1 | 2021-07-01T00:00:01 | 2021-07-01T00:00:02 | -+-------+---------------------+---------------------+ - -DROP FLOW test_numbers_df_func; - -Affected Rows: 0 - -DROP TABLE numbers_input_df_func; - -Affected Rows: 0 - -DROP TABLE out_num_cnt_df_func; - -Affected Rows: 0 - diff --git a/tests/cases/standalone/common/flow/df_func.sql b/tests/cases/standalone/common/flow/df_func.sql index b9a22cb9da6d..8b137891791f 100644 --- a/tests/cases/standalone/common/flow/df_func.sql +++ b/tests/cases/standalone/common/flow/df_func.sql @@ -1,67 +1 @@ -CREATE TABLE numbers_input_df_func ( - number INT, - ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY(number), - TIME INDEX(ts) -); --- call `sum(abs(number))` where `abs` is DataFusion Function and `sum` is flow function -CREATE FLOW test_numbers_df_func -SINK TO out_num_cnt_df_func -AS -SELECT sum(abs(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); - -INSERT INTO numbers_input_df_func -VALUES - (-20, "2021-07-01 00:00:00.200"), - (22, "2021-07-01 00:00:00.600"); - --- sleep a little bit longer to make sure that table is created and data is inserted --- SQLNESS SLEEP 3s -SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; - -INSERT INTO numbers_input_df_func -VALUES - (23,"2021-07-01 00:00:01.000"), - (-24,"2021-07-01 00:00:01.500"); - --- SQLNESS SLEEP 2s -SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; - -DROP FLOW test_numbers_df_func; -DROP TABLE numbers_input_df_func; -DROP TABLE out_num_cnt_df_func; - -CREATE TABLE numbers_input_df_func ( - number INT, - ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY(number), - TIME INDEX(ts) -); - --- call `abs(sum(number))`to make sure that calling `abs` function(impl by datafusion) on `sum` function(impl by flow) is working -CREATE FLOW test_numbers_df_func -SINK TO out_num_cnt_df_func -AS -SELECT abs(sum(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); - -INSERT INTO numbers_input_df_func -VALUES - (-20, "2021-07-01 00:00:00.200"), - (22, "2021-07-01 00:00:00.600"); - --- sleep a little bit longer to make sure that table is created and data is inserted --- SQLNESS SLEEP 3s -SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; - -INSERT INTO numbers_input_df_func -VALUES - (23,"2021-07-01 00:00:01.000"), - (-24,"2021-07-01 00:00:01.500"); - --- SQLNESS SLEEP 2s -SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; - -DROP FLOW test_numbers_df_func; -DROP TABLE numbers_input_df_func; -DROP TABLE out_num_cnt_df_func;