Skip to content

Commit

Permalink
test: move kafka alter source test to source_inline
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <xxchan22f@gmail.com>
  • Loading branch information
xxchan committed Oct 11, 2024
1 parent b01906e commit 588ea5d
Show file tree
Hide file tree
Showing 10 changed files with 157 additions and 149 deletions.
14 changes: 0 additions & 14 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -152,20 +152,6 @@ risedev ci-start ci-kafka
./scripts/source/prepare_ci_kafka.sh
risedev slt './e2e_test/source/basic/*.slt'
risedev slt './e2e_test/source/basic/old_row_format_syntax/*.slt'
risedev slt './e2e_test/source/basic/alter/kafka.slt'

echo "--- e2e, kafka alter source rate limit"
risedev slt './e2e_test/source/basic/alter/rate_limit_source_kafka.slt'
risedev slt './e2e_test/source/basic/alter/rate_limit_table_kafka.slt'

echo "--- e2e, kafka alter source"
chmod +x ./scripts/source/prepare_data_after_alter.sh
./scripts/source/prepare_data_after_alter.sh 2
risedev slt './e2e_test/source/basic/alter/kafka_after_new_data.slt'

echo "--- e2e, kafka alter source again"
./scripts/source/prepare_data_after_alter.sh 3
risedev slt './e2e_test/source/basic/alter/kafka_after_new_data_2.slt'

echo "--- Run CH-benCHmark"
risedev slt './e2e_test/ch_benchmark/batch/ch_benchmark.slt'
Expand Down
82 changes: 0 additions & 82 deletions e2e_test/source/basic/alter/kafka_after_new_data.slt

This file was deleted.

14 changes: 0 additions & 14 deletions e2e_test/source/basic/alter/kafka_after_new_data_2.slt

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,24 +1,34 @@
control substitution on

system ok
rpk topic delete kafka_alter || true

system ok
rpk topic create kafka_alter -p 1

system ok
cat <<EOF | rpk topic produce kafka_alter -f "%v\n"
{"v1": 1, "v2": "11", "v3": 111}
EOF

statement ok
CREATE SOURCE s1 (v1 int) with (
connector = 'kafka',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'kafka_alter',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE s2 (v2 varchar) with (
connector = 'kafka',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'kafka_alter',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE TABLE t (v1 int) with (
connector = 'kafka',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'kafka_alter',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

Expand All @@ -29,7 +39,7 @@ create materialized view mv1 as select * from s1;
statement ok
create materialized view mv2 as select * from s2;

sleep 5s
sleep 2s

statement ok
flush;
Expand Down Expand Up @@ -70,7 +80,7 @@ create materialized view mv4 as select * from s2;
statement ok
alter table t add column v2 varchar;

sleep 5s
sleep 2s

statement ok
flush;
Expand Down Expand Up @@ -117,7 +127,7 @@ alter source s1 add column v3 int;
statement ok
create materialized view mv5 as select * from s1;

sleep 5s
sleep 2s

statement ok
flush;
Expand All @@ -133,7 +143,129 @@ select * from mv5
1 11 111

# check definition after altering
query TTTT
describe s1;
----
v1 integer false NULL
_rw_kafka_timestamp timestamp with time zone true NULL
_row_id serial true NULL
v2 character varying false NULL
v3 integer false NULL
primary key _row_id NULL NULL
table description s1 NULL NULL

query TT
show create source s1;
----
public.s1 CREATE SOURCE s1 (v1 INT, v2 CHARACTER VARYING, v3 INT) WITH (connector = 'kafka', topic = 'kafka_alter', properties.bootstrap.server = 'message_queue:29092', scan.startup.mode = 'earliest') FORMAT PLAIN ENCODE JSON
public.s1 CREATE SOURCE s1 (v1 INT, v2 CHARACTER VARYING, v3 INT) WITH (connector = 'kafka', topic = 'kafka_alter', properties.bootstrap.server = 'message_queue:29092', scan.startup.mode = 'earliest') FORMAT PLAIN ENCODE JSON


system ok
cat <<EOF | rpk topic produce kafka_alter -f "%v\n"
{"v1": 2, "v2": "22", "v3": 222}
EOF


sleep 2s

statement ok
flush;

query IT rowsort
select * from s1
----
1 11 111
2 22 222

query I rowsort
select * from mv1
----
1
2

query IT rowsort
select * from mv3
----
1 11
2 22

query TI rowsort
select * from s2
----
11 NULL
22 NULL

query T rowsort
select * from mv2
----
11
22

query TI rowsort
select * from mv4
----
11 NULL
22 NULL

query ITI rowsort
select * from mv5
----
1 11 111
2 22 222

query IT rowsort
select * from t
----
1 NULL
2 22

statement ok
alter table t add column v3 int;

query IT rowsort
select * from t
----
1 NULL NULL
2 22 NULL

statement ok
drop materialized view mv1

statement ok
drop materialized view mv2

statement ok
drop materialized view mv3

statement ok
drop materialized view mv4

statement ok
drop materialized view mv5

statement ok
drop source s1

statement ok
drop source s2


system ok
cat <<EOF | rpk topic produce kafka_alter -f "%v\n"
{"v1": 3, "v2": "33", "v3": 333}
EOF

sleep 2s

statement ok
flush;

query IT rowsort
select * from t
----
1 NULL NULL
2 22 NULL
3 33 333

statement ok
drop table t;
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
control substitution on

############## Create kafka seed data

statement ok
Expand All @@ -12,11 +14,10 @@ statement ok
create sink kafka_sink
from
kafka_seed_data with (
properties.bootstrap.server = 'message_queue:29092',
topic = 'kafka_source',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_rate_limit',
type = 'append-only',
force_append_only='true',
connector = 'kafka'
force_append_only='true'
);

############## Source from kafka (rate_limit = 0)
Expand All @@ -27,9 +28,8 @@ sleep 5s

statement ok
create source kafka_source (v1 int) with (
connector = 'kafka',
topic = 'kafka_source',
properties.bootstrap.server = 'message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_rate_limit',
scan.startup.mode = 'earliest',
source_rate_limit = 0,
) FORMAT PLAIN ENCODE JSON
Expand Down Expand Up @@ -126,4 +126,4 @@ statement ok
drop sink kafka_sink;

statement ok
drop table kafka_seed_data;
drop table kafka_seed_data;
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
control substitution on

############## Create kafka seed data

statement ok
Expand All @@ -12,11 +14,10 @@ statement ok
create sink kafka_sink
from
kafka_seed_data with (
properties.bootstrap.server = 'message_queue:29092',
topic = 'rate_limit_source_kafka_0',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_rate_limit_for_table',
type = 'append-only',
force_append_only='true',
connector = 'kafka'
);

# topic may not be created yet
Expand All @@ -26,9 +27,8 @@ sleep 4s

statement ok
create table kafka_source (v1 int) with (
connector = 'kafka',
topic = 'rate_limit_source_kafka_0',
properties.bootstrap.server = 'message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_rate_limit_for_table',
scan.startup.mode = 'earliest',
source_rate_limit = 0
) FORMAT PLAIN ENCODE JSON
Expand Down Expand Up @@ -101,4 +101,4 @@ statement ok
drop sink kafka_sink;

statement ok
drop table kafka_seed_data;
drop table kafka_seed_data;
Loading

0 comments on commit 588ea5d

Please sign in to comment.