Skip to content

Commit

Permalink
refactor(test): move some tests from source_legacy to source_inline
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Oct 14, 2024
1 parent 883bcb1 commit eaebb9c
Show file tree
Hide file tree
Showing 22 changed files with 61 additions and 73 deletions.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ CREATE TABLE diamonds (
) WITH (
connector = 'posix_fs',
match_pattern = 'data*.csv',
posix_fs.root = 'e2e_test/source_legacy/opendal/data',
posix_fs.root = 'e2e_test/source_inline/fs/data',
) FORMAT PLAIN ENCODE CSV ( without_header = 'false', delimiter = ',');

sleep 10s
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,9 +1,24 @@
control substitution on

system ok
rpk topic delete json_timestamptz_handling_mode || true

system ok
rpk topic create json_timestamptz_handling_mode -p 1

system ok
cat <<EOF | rpk topic produce json_timestamptz_handling_mode -f "%v\n"
{"case":"0 number small","payload":{"after":{"case":"0 number small","at":100},"op":"r"}}
{"case":"1 number recent","payload":{"after":{"case":"1 number recent","at":1712800800123456},"op":"r"}}
{"case":"2 string utc","payload":{"after":{"case":"2 string utc","at":"2024-04-11T02:00:00.654321Z"},"op":"r"}}
{"case":"3 string naive","payload":{"after":{"case":"3 string naive","at":"2024-04-11 02:00:00.234321"},"op":"r"}}
EOF

statement error unrecognized
create table t (
payload struct<after struct<"case" varchar, at timestamptz>>)
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'json_timestamptz_handling_mode')
format plain encode json (timestamptz.handling.mode = 'mili');

Expand All @@ -13,8 +28,7 @@ create table plain_guess (
"case" varchar,
payload struct<after struct<"case" varchar, at timestamptz>>)
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'json_timestamptz_handling_mode')
format plain encode json (timestamptz.handling.mod = 'mili');

Expand All @@ -23,8 +37,7 @@ create table plain_milli (
"case" varchar,
payload struct<after struct<"case" varchar, at timestamptz>>)
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'json_timestamptz_handling_mode')
format plain encode json (timestamptz.handling.mode = 'milli');

Expand All @@ -33,8 +46,7 @@ create table plain_micro (
"case" varchar,
payload struct<after struct<"case" varchar, at timestamptz>>)
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'json_timestamptz_handling_mode')
format plain encode json (timestamptz.handling.mode = 'micro');

Expand All @@ -43,8 +55,7 @@ create table plain_utc (
"case" varchar,
payload struct<after struct<"case" varchar, at timestamptz>>)
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'json_timestamptz_handling_mode')
format plain encode json (timestamptz.handling.mode = 'utc_string');

Expand All @@ -53,17 +64,15 @@ create table plain_naive (
"case" varchar,
payload struct<after struct<"case" varchar, at timestamptz>>)
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'json_timestamptz_handling_mode')
format plain encode json (timestamptz.handling.mode = 'utc_without_suffix');

statement ok
create table debezium_milli (
"case" varchar, at timestamptz, primary key("case"))
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'json_timestamptz_handling_mode')
format debezium encode json (timestamptz.handling.mode = 'milli');

Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,8 +1,23 @@
control substitution on

system ok
rpk topic delete test_temporary_kafka_batch || true

system ok
rpk topic create test_temporary_kafka_batch -p 1

system ok
cat <<EOF | rpk topic produce test_temporary_kafka_batch -f "%v\n"
{"v1": 1, "v2": "1"}
{"v1": 2, "v2": "22"}
{"v1": 3, "v2": "333"}
{"v1": 4, "v2": "4444"}
EOF

statement ok
create temporary source s1 (v1 int, v2 varchar) with (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_temporary_kafka_batch',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,23 @@
control substitution on

system ok
rpk topic delete test_ttl_table_with_con || true

system ok
rpk topic create test_ttl_table_with_con -p 1

system ok
cat <<EOF | rpk topic produce test_ttl_table_with_con -f "%v\n"
{"v1": 1, "v2": "1"}
{"v1": 2, "v2": "22"}
{"v1": 3, "v2": "333"}
{"v1": 4, "v2": "4444"}
EOF

statement ok
create table t (v1 int, v2 varchar) APPEND ONLY with (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_ttl_table_with_con',
scan.startup.mode = 'earliest',
retention_seconds = 5
) FORMAT PLAIN ENCODE JSON;
Expand Down

This file was deleted.

This file was deleted.

1 change: 0 additions & 1 deletion e2e_test/source_legacy/basic/scripts/test_data/weiling.1

This file was deleted.

2 changes: 1 addition & 1 deletion src/stream/src/executor/source/source_backfill_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ impl BackfillStage {
vis
}

/// Updates backfill states and returns whether the row from upstream `SourceExecutor` is visible.
/// Updates backfill states and returns whether the row backfilled from external system is visible.
fn handle_backfill_row(&mut self, split_id: &str, offset: &str) -> bool {
let state = self.states.get_mut(split_id).unwrap();
match state {
Expand Down

0 comments on commit eaebb9c

Please sign in to comment.