diff --git a/tests/cases/standalone/common/flow/flow_basic.result b/tests/cases/standalone/common/flow/flow_basic.result new file mode 100644 index 000000000000..fb3d4a5c6dcf --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_basic.result @@ -0,0 +1,134 @@ +CREATE TABLE numbers_input_basic ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +Affected Rows: 0 + +CREATE FLOW test_numbers_basic +SINK TO out_num_cnt_basic +AS +SELECT sum(number) FROM numbers_input_basic GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); + +Affected Rows: 0 + +-- TODO(discord9): confirm if it's necessary to flush flow here? +select flush_flow('test_numbers_basic')*0; + ++---------------------------------------------------+ +| flush_flow(Utf8("test_numbers_basic")) * Int64(0) | ++---------------------------------------------------+ +| 0 | ++---------------------------------------------------+ + +INSERT INTO numbers_input_basic +VALUES + (20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +Affected Rows: 2 + +select flush_flow('test_numbers_basic')*0; + ++---------------------------------------------------+ +| flush_flow(Utf8("test_numbers_basic")) * Int64(0) | ++---------------------------------------------------+ +| 0 | ++---------------------------------------------------+ + +SELECT col_0, window_start, window_end FROM out_num_cnt_basic; + ++-------+---------------------+---------------------+ +| col_0 | window_start | window_end | ++-------+---------------------+---------------------+ +| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 | ++-------+---------------------+---------------------+ + +select flush_flow('test_numbers_basic')*0; + ++---------------------------------------------------+ +| flush_flow(Utf8("test_numbers_basic")) * Int64(0) | ++---------------------------------------------------+ +| 0 | ++---------------------------------------------------+ + +INSERT INTO numbers_input_basic +VALUES + (23,"2021-07-01 00:00:01.000"), + (24,"2021-07-01 00:00:01.500"); + +Affected Rows: 2 + +select flush_flow('test_numbers_basic')*0; + ++---------------------------------------------------+ +| flush_flow(Utf8("test_numbers_basic")) * Int64(0) | ++---------------------------------------------------+ +| 0 | ++---------------------------------------------------+ + +SELECT col_0, window_start, window_end FROM out_num_cnt_basic; + ++-------+---------------------+---------------------+ +| col_0 | window_start | window_end | ++-------+---------------------+---------------------+ +| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 | +| 47 | 2021-07-01T00:00:01 | 2021-07-01T00:00:02 | ++-------+---------------------+---------------------+ + +DROP FLOW test_numbers_basic; + +Affected Rows: 0 + +DROP TABLE numbers_input_basic; + +Affected Rows: 0 + +DROP TABLE out_num_cnt_basic; + +Affected Rows: 0 + +-- test interprete interval +CREATE TABLE numbers_input_basic ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +Affected Rows: 0 + +create table out_num_cnt_basic ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX); + +Affected Rows: 0 + +CREATE FLOW filter_numbers_basic SINK TO out_num_cnt_basic AS SELECT INTERVAL '1 day 1 second', INTERVAL '1 month 1 day 1 second', INTERVAL '1 year 1 month' FROM numbers_input_basic where number > 10; + +Affected Rows: 0 + +SHOW CREATE FLOW filter_numbers_basic; + ++----------------------+----------------------------------------------------------------------------------------------------------------------------------------------+ +| Flow | Create Flow | ++----------------------+----------------------------------------------------------------------------------------------------------------------------------------------+ +| filter_numbers_basic | CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers_basic | +| | SINK TO out_num_cnt_basic | +| | AS SELECT INTERVAL '1 day 1 second', INTERVAL '1 month 1 day 1 second', INTERVAL '1 year 1 month' FROM numbers_input_basic WHERE number > 10 | ++----------------------+----------------------------------------------------------------------------------------------------------------------------------------------+ + +drop flow filter_numbers_basic; + +Affected Rows: 0 + +drop table out_num_cnt_basic; + +Affected Rows: 0 + +drop table numbers_input_basic; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/flow/flow_basic.sql b/tests/cases/standalone/common/flow/flow_basic.sql new file mode 100644 index 000000000000..de6fa42c4fcc --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_basic.sql @@ -0,0 +1,60 @@ +CREATE TABLE numbers_input_basic ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +CREATE FLOW test_numbers_basic +SINK TO out_num_cnt_basic +AS +SELECT sum(number) FROM numbers_input_basic GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); + +-- TODO(discord9): confirm if it's necessary to flush flow here? +select flush_flow('test_numbers_basic')*0; + +INSERT INTO numbers_input_basic +VALUES + (20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +select flush_flow('test_numbers_basic')*0; + +SELECT col_0, window_start, window_end FROM out_num_cnt_basic; + +select flush_flow('test_numbers_basic')*0; + +INSERT INTO numbers_input_basic +VALUES + (23,"2021-07-01 00:00:01.000"), + (24,"2021-07-01 00:00:01.500"); + +select flush_flow('test_numbers_basic')*0; + +SELECT col_0, window_start, window_end FROM out_num_cnt_basic; + +DROP FLOW test_numbers_basic; +DROP TABLE numbers_input_basic; +DROP TABLE out_num_cnt_basic; + +-- test interprete interval + +CREATE TABLE numbers_input_basic ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); +create table out_num_cnt_basic ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX); + +CREATE FLOW filter_numbers_basic SINK TO out_num_cnt_basic AS SELECT INTERVAL '1 day 1 second', INTERVAL '1 month 1 day 1 second', INTERVAL '1 year 1 month' FROM numbers_input_basic where number > 10; + +SHOW CREATE FLOW filter_numbers_basic; + +drop flow filter_numbers_basic; + +drop table out_num_cnt_basic; + +drop table numbers_input_basic; \ No newline at end of file diff --git a/tests/cases/standalone/common/flow/flow_call_df_func.result b/tests/cases/standalone/common/flow/flow_call_df_func.result new file mode 100644 index 000000000000..804f74439353 --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_call_df_func.result @@ -0,0 +1,370 @@ +CREATE TABLE numbers_input_df_func ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +Affected Rows: 0 + +-- call `sum(abs(number))` where `abs` is DataFusion Function and `sum` is flow function +CREATE FLOW test_numbers_df_func +SINK TO out_num_cnt_df_func +AS +SELECT sum(abs(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); + +Affected Rows: 0 + +select flush_flow('test_numbers_df_func')*0; + ++-----------------------------------------------------+ +| flush_flow(Utf8("test_numbers_df_func")) * Int64(0) | ++-----------------------------------------------------+ +| 0 | ++-----------------------------------------------------+ + +INSERT INTO numbers_input_df_func +VALUES + (-20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +Affected Rows: 2 + +-- flush flow to make sure that table is created and data is inserted +select flush_flow('test_numbers_df_func')*0; + ++-----------------------------------------------------+ +| flush_flow(Utf8("test_numbers_df_func")) * Int64(0) | ++-----------------------------------------------------+ +| 0 | ++-----------------------------------------------------+ + +SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; + ++-------+---------------------+---------------------+ +| col_0 | window_start | window_end | ++-------+---------------------+---------------------+ +| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 | ++-------+---------------------+---------------------+ + +select flush_flow('test_numbers_df_func')*0; + ++-----------------------------------------------------+ +| flush_flow(Utf8("test_numbers_df_func")) * Int64(0) | ++-----------------------------------------------------+ +| 0 | ++-----------------------------------------------------+ + +INSERT INTO numbers_input_df_func +VALUES + (23,"2021-07-01 00:00:01.000"), + (-24,"2021-07-01 00:00:01.500"); + +Affected Rows: 2 + +select flush_flow('test_numbers_df_func')*0; + ++-----------------------------------------------------+ +| flush_flow(Utf8("test_numbers_df_func")) * Int64(0) | ++-----------------------------------------------------+ +| 0 | ++-----------------------------------------------------+ + +SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; + ++-------+---------------------+---------------------+ +| col_0 | window_start | window_end | ++-------+---------------------+---------------------+ +| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 | +| 47 | 2021-07-01T00:00:01 | 2021-07-01T00:00:02 | ++-------+---------------------+---------------------+ + +DROP FLOW test_numbers_df_func; + +Affected Rows: 0 + +DROP TABLE numbers_input_df_func; + +Affected Rows: 0 + +DROP TABLE out_num_cnt_df_func; + +Affected Rows: 0 + +CREATE TABLE numbers_input_df_func ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +Affected Rows: 0 + +-- call `abs(sum(number))`to make sure that calling `abs` function(impl by datafusion) on `sum` function(impl by flow) is working +CREATE FLOW test_numbers_df_func +SINK TO out_num_cnt_df_func +AS +SELECT abs(sum(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); + +Affected Rows: 0 + +select flush_flow('test_numbers_df_func')*0; + ++-----------------------------------------------------+ +| flush_flow(Utf8("test_numbers_df_func")) * Int64(0) | ++-----------------------------------------------------+ +| 0 | ++-----------------------------------------------------+ + +INSERT INTO numbers_input_df_func +VALUES + (-20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +Affected Rows: 2 + +-- flush flow to make sure that table is created and data is inserted +select flush_flow('test_numbers_df_func')*0; + ++-----------------------------------------------------+ +| flush_flow(Utf8("test_numbers_df_func")) * Int64(0) | ++-----------------------------------------------------+ +| 0 | ++-----------------------------------------------------+ + +SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; + ++-------+---------------------+---------------------+ +| col_0 | window_start | window_end | ++-------+---------------------+---------------------+ +| 2 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 | ++-------+---------------------+---------------------+ + +select flush_flow('test_numbers_df_func')*0; + ++-----------------------------------------------------+ +| flush_flow(Utf8("test_numbers_df_func")) * Int64(0) | ++-----------------------------------------------------+ +| 0 | ++-----------------------------------------------------+ + +INSERT INTO numbers_input_df_func +VALUES + (23,"2021-07-01 00:00:01.000"), + (-24,"2021-07-01 00:00:01.500"); + +Affected Rows: 2 + +select flush_flow('test_numbers_df_func')*0; + ++-----------------------------------------------------+ +| flush_flow(Utf8("test_numbers_df_func")) * Int64(0) | ++-----------------------------------------------------+ +| 0 | ++-----------------------------------------------------+ + +SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; + ++-------+---------------------+---------------------+ +| col_0 | window_start | window_end | ++-------+---------------------+---------------------+ +| 2 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 | +| 1 | 2021-07-01T00:00:01 | 2021-07-01T00:00:02 | ++-------+---------------------+---------------------+ + +DROP FLOW test_numbers_df_func; + +Affected Rows: 0 + +DROP TABLE numbers_input_df_func; + +Affected Rows: 0 + +DROP TABLE out_num_cnt_df_func; + +Affected Rows: 0 + +-- test date_bin +CREATE TABLE numbers_input_df_func ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +Affected Rows: 0 + +CREATE FLOW test_numbers_df_func +SINK TO out_num_cnt_df_func +AS +SELECT max(number) - min(number), date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00'::TimestampNanosecond) FROM numbers_input_df_func GROUP BY date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00'::TimestampNanosecond); + +Affected Rows: 0 + +select flush_flow('test_numbers_df_func')*0; + ++-----------------------------------------------------+ +| flush_flow(Utf8("test_numbers_df_func")) * Int64(0) | ++-----------------------------------------------------+ +| 0 | ++-----------------------------------------------------+ + +INSERT INTO numbers_input_df_func +VALUES + (20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +Affected Rows: 2 + +select flush_flow('test_numbers_df_func')*0; + ++-----------------------------------------------------+ +| flush_flow(Utf8("test_numbers_df_func")) * Int64(0) | ++-----------------------------------------------------+ +| 0 | ++-----------------------------------------------------+ + +SELECT col_0, col_1 FROM out_num_cnt_df_func; + ++-------+---------------------+ +| col_0 | col_1 | ++-------+---------------------+ +| 2 | 2021-07-01T00:00:00 | ++-------+---------------------+ + +select flush_flow('test_numbers_df_func')*0; + ++-----------------------------------------------------+ +| flush_flow(Utf8("test_numbers_df_func")) * Int64(0) | ++-----------------------------------------------------+ +| 0 | ++-----------------------------------------------------+ + +INSERT INTO numbers_input_df_func +VALUES + (23,"2021-07-01 00:00:01.000"), + (24,"2021-07-01 00:00:01.500"); + +Affected Rows: 2 + +select flush_flow('test_numbers_df_func')*0; + ++-----------------------------------------------------+ +| flush_flow(Utf8("test_numbers_df_func")) * Int64(0) | ++-----------------------------------------------------+ +| 0 | ++-----------------------------------------------------+ + +SELECT col_0, col_1 FROM out_num_cnt_df_func; + ++-------+---------------------+ +| col_0 | col_1 | ++-------+---------------------+ +| 2 | 2021-07-01T00:00:00 | +| 1 | 2021-07-01T00:00:01 | ++-------+---------------------+ + +DROP FLOW test_numbers_df_func; + +Affected Rows: 0 + +DROP TABLE numbers_input_df_func; + +Affected Rows: 0 + +DROP TABLE out_num_cnt_df_func; + +Affected Rows: 0 + +-- test date_trunc +CREATE TABLE numbers_input_df_func ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +Affected Rows: 0 + +CREATE FLOW test_numbers_df_func +SINK TO out_num_cnt +AS +SELECT date_trunc('second', ts), sum(number) FROM numbers_input_df_func GROUP BY date_trunc('second', ts); + +Affected Rows: 0 + +select flush_flow('test_numbers_df_func')*0; + ++-----------------------------------------------------+ +| flush_flow(Utf8("test_numbers_df_func")) * Int64(0) | ++-----------------------------------------------------+ +| 0 | ++-----------------------------------------------------+ + +INSERT INTO numbers_input_df_func +VALUES + (20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +Affected Rows: 2 + +select flush_flow('test_numbers_df_func')*0; + ++-----------------------------------------------------+ +| flush_flow(Utf8("test_numbers_df_func")) * Int64(0) | ++-----------------------------------------------------+ +| 0 | ++-----------------------------------------------------+ + +SELECT col_0, col_1 FROM out_num_cnt; + ++---------------------+-------+ +| col_0 | col_1 | ++---------------------+-------+ +| 2021-07-01T00:00:00 | 42 | ++---------------------+-------+ + +select flush_flow('test_numbers_df_func')*0; + ++-----------------------------------------------------+ +| flush_flow(Utf8("test_numbers_df_func")) * Int64(0) | ++-----------------------------------------------------+ +| 0 | ++-----------------------------------------------------+ + +INSERT INTO numbers_input_df_func +VALUES + (23,"2021-07-01 00:00:01.000"), + (24,"2021-07-01 00:00:01.500"); + +Affected Rows: 2 + +select flush_flow('test_numbers_df_func')*0; + ++-----------------------------------------------------+ +| flush_flow(Utf8("test_numbers_df_func")) * Int64(0) | ++-----------------------------------------------------+ +| 0 | ++-----------------------------------------------------+ + +SELECT col_0, col_1 FROM out_num_cnt; + ++---------------------+-------+ +| col_0 | col_1 | ++---------------------+-------+ +| 2021-07-01T00:00:00 | 42 | +| 2021-07-01T00:00:01 | 47 | ++---------------------+-------+ + +DROP FLOW test_numbers_df_func; + +Affected Rows: 0 + +DROP TABLE numbers_input_df_func; + +Affected Rows: 0 + +DROP TABLE out_num_cnt; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/flow/flow_call_df_func.sql b/tests/cases/standalone/common/flow/flow_call_df_func.sql new file mode 100644 index 000000000000..61a6e58e3c27 --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_call_df_func.sql @@ -0,0 +1,158 @@ +CREATE TABLE numbers_input_df_func ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +-- call `sum(abs(number))` where `abs` is DataFusion Function and `sum` is flow function +CREATE FLOW test_numbers_df_func +SINK TO out_num_cnt_df_func +AS +SELECT sum(abs(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); + +select flush_flow('test_numbers_df_func')*0; + +INSERT INTO numbers_input_df_func +VALUES + (-20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +-- flush flow to make sure that table is created and data is inserted +select flush_flow('test_numbers_df_func')*0; + +SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; + +select flush_flow('test_numbers_df_func')*0; + +INSERT INTO numbers_input_df_func +VALUES + (23,"2021-07-01 00:00:01.000"), + (-24,"2021-07-01 00:00:01.500"); + +select flush_flow('test_numbers_df_func')*0; + +SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; + +DROP FLOW test_numbers_df_func; +DROP TABLE numbers_input_df_func; +DROP TABLE out_num_cnt_df_func; + +CREATE TABLE numbers_input_df_func ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +-- call `abs(sum(number))`to make sure that calling `abs` function(impl by datafusion) on `sum` function(impl by flow) is working +CREATE FLOW test_numbers_df_func +SINK TO out_num_cnt_df_func +AS +SELECT abs(sum(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); + +select flush_flow('test_numbers_df_func')*0; + +INSERT INTO numbers_input_df_func +VALUES + (-20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +-- flush flow to make sure that table is created and data is inserted +select flush_flow('test_numbers_df_func')*0; + +SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; + +select flush_flow('test_numbers_df_func')*0; + +INSERT INTO numbers_input_df_func +VALUES + (23,"2021-07-01 00:00:01.000"), + (-24,"2021-07-01 00:00:01.500"); + +select flush_flow('test_numbers_df_func')*0; + +SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; + +DROP FLOW test_numbers_df_func; +DROP TABLE numbers_input_df_func; +DROP TABLE out_num_cnt_df_func; + +-- test date_bin +CREATE TABLE numbers_input_df_func ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +CREATE FLOW test_numbers_df_func +SINK TO out_num_cnt_df_func +AS +SELECT max(number) - min(number), date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00'::TimestampNanosecond) FROM numbers_input_df_func GROUP BY date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00'::TimestampNanosecond); + +select flush_flow('test_numbers_df_func')*0; + +INSERT INTO numbers_input_df_func +VALUES + (20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +select flush_flow('test_numbers_df_func')*0; + +SELECT col_0, col_1 FROM out_num_cnt_df_func; + +select flush_flow('test_numbers_df_func')*0; + +INSERT INTO numbers_input_df_func +VALUES + (23,"2021-07-01 00:00:01.000"), + (24,"2021-07-01 00:00:01.500"); + +select flush_flow('test_numbers_df_func')*0; + +SELECT col_0, col_1 FROM out_num_cnt_df_func; + +DROP FLOW test_numbers_df_func; +DROP TABLE numbers_input_df_func; +DROP TABLE out_num_cnt_df_func; + + +-- test date_trunc +CREATE TABLE numbers_input_df_func ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +CREATE FLOW test_numbers_df_func +SINK TO out_num_cnt +AS +SELECT date_trunc('second', ts), sum(number) FROM numbers_input_df_func GROUP BY date_trunc('second', ts); + +select flush_flow('test_numbers_df_func')*0; + +INSERT INTO numbers_input_df_func +VALUES + (20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +select flush_flow('test_numbers_df_func')*0; + +SELECT col_0, col_1 FROM out_num_cnt; + +select flush_flow('test_numbers_df_func')*0; + +INSERT INTO numbers_input_df_func +VALUES + (23,"2021-07-01 00:00:01.000"), + (24,"2021-07-01 00:00:01.500"); + +select flush_flow('test_numbers_df_func')*0; + +SELECT col_0, col_1 FROM out_num_cnt; + +DROP FLOW test_numbers_df_func; +DROP TABLE numbers_input_df_func; +DROP TABLE out_num_cnt; \ No newline at end of file diff --git a/tests/cases/standalone/common/flow/show_create_flow.result b/tests/cases/standalone/common/flow/show_create_flow.result index 004d614f97d0..a91930ee7620 100644 --- a/tests/cases/standalone/common/flow/show_create_flow.result +++ b/tests/cases/standalone/common/flow/show_create_flow.result @@ -1,4 +1,4 @@ -CREATE TABLE numbers_input ( +CREATE TABLE numbers_input_show ( number INT, ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY(number), @@ -7,75 +7,71 @@ CREATE TABLE numbers_input ( Affected Rows: 0 -create table out_num_cnt ( +create table out_num_cnt_show ( number INT, ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX); Affected Rows: 0 -SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS; +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; -+-----------+---------------+-----------------+ -| flow_name | table_catalog | flow_definition | -+-----------+---------------+-----------------+ -+-----------+---------------+-----------------+ +++ +++ -SHOW FLOWS; +SHOW FLOWS LIKE 'filter_numbers_show'; ++ ++ -CREATE FLOW filter_numbers SINK TO out_num_cnt AS SELECT number FROM numbers_input where number > 10; +CREATE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number FROM numbers_input_show where number > 10; Affected Rows: 0 -SHOW CREATE FLOW filter_numbers; +SHOW CREATE FLOW filter_numbers_show; -+----------------+-------------------------------------------------------+ -| Flow | Create Flow | -+----------------+-------------------------------------------------------+ -| filter_numbers | CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers | -| | SINK TO out_num_cnt | -| | AS SELECT number FROM numbers_input WHERE number > 10 | -+----------------+-------------------------------------------------------+ ++---------------------+------------------------------------------------------------+ +| Flow | Create Flow | ++---------------------+------------------------------------------------------------+ +| filter_numbers_show | CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers_show | +| | SINK TO out_num_cnt_show | +| | AS SELECT number FROM numbers_input_show WHERE number > 10 | ++---------------------+------------------------------------------------------------+ -SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS; +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; -+----------------+---------------+----------------------------------------------------+ -| flow_name | table_catalog | flow_definition | -+----------------+---------------+----------------------------------------------------+ -| filter_numbers | greptime | SELECT number FROM numbers_input WHERE number > 10 | -+----------------+---------------+----------------------------------------------------+ ++---------------------+---------------+---------------------------------------------------------+ +| flow_name | table_catalog | flow_definition | ++---------------------+---------------+---------------------------------------------------------+ +| filter_numbers_show | greptime | SELECT number FROM numbers_input_show WHERE number > 10 | ++---------------------+---------------+---------------------------------------------------------+ -SHOW FLOWS; +SHOW FLOWS LIKE 'filter_numbers_show'; -+----------------+ -| Flows | -+----------------+ -| filter_numbers | -+----------------+ ++---------------------+ +| Flows | ++---------------------+ +| filter_numbers_show | ++---------------------+ -drop flow filter_numbers; +drop flow filter_numbers_show; Affected Rows: 0 -SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS; +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; -+-----------+---------------+-----------------+ -| flow_name | table_catalog | flow_definition | -+-----------+---------------+-----------------+ -+-----------+---------------+-----------------+ +++ +++ -SHOW FLOWS; +SHOW FLOWS LIKE 'filter_numbers_show'; ++ ++ -drop table out_num_cnt; +drop table out_num_cnt_show; Affected Rows: 0 -drop table numbers_input; +drop table numbers_input_show; Affected Rows: 0 diff --git a/tests/cases/standalone/common/flow/show_create_flow.sql b/tests/cases/standalone/common/flow/show_create_flow.sql index 0f5907877fd9..3cf84ed0f335 100644 --- a/tests/cases/standalone/common/flow/show_create_flow.sql +++ b/tests/cases/standalone/common/flow/show_create_flow.sql @@ -1,32 +1,32 @@ -CREATE TABLE numbers_input ( +CREATE TABLE numbers_input_show ( number INT, ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY(number), TIME INDEX(ts) ); -create table out_num_cnt ( +create table out_num_cnt_show ( number INT, ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX); -SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS; +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; -SHOW FLOWS; +SHOW FLOWS LIKE 'filter_numbers_show'; -CREATE FLOW filter_numbers SINK TO out_num_cnt AS SELECT number FROM numbers_input where number > 10; +CREATE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number FROM numbers_input_show where number > 10; -SHOW CREATE FLOW filter_numbers; +SHOW CREATE FLOW filter_numbers_show; -SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS; +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; -SHOW FLOWS; +SHOW FLOWS LIKE 'filter_numbers_show'; -drop flow filter_numbers; +drop flow filter_numbers_show; -SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS; +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; -SHOW FLOWS; +SHOW FLOWS LIKE 'filter_numbers_show'; -drop table out_num_cnt; +drop table out_num_cnt_show; -drop table numbers_input; +drop table numbers_input_show;