Skip to content

Commit

Permalink
fix(source): fix panic for ALTER SOURCE with schema registry (#17293)
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <xxchan22f@gmail.com>
  • Loading branch information
xxchan authored and xxchan committed Jun 19, 2024
1 parent 0436617 commit 9aff870
Show file tree
Hide file tree
Showing 24 changed files with 374 additions and 201 deletions.
15 changes: 1 addition & 14 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ tar xf ./risingwave-connector.tar.gz -C ./connector-node

echo "--- Install dependencies"
python3 -m pip install --break-system-packages requests protobuf fastavro confluent_kafka jsonschema
apt-get -y install jq

echo "--- e2e, inline test"
RUST_LOG="debug,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
Expand Down Expand Up @@ -134,21 +135,7 @@ risedev slt './e2e_test/source/cdc/cdc_share_stream_drop.slt'

echo "--- Kill cluster"
risedev ci-kill

echo "--- e2e, ci-1cn-1fe, protobuf schema registry"
export RISINGWAVE_CI=true
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
risedev ci-start ci-1cn-1fe
python3 e2e_test/schema_registry/pb.py "message_queue:29092" "http://schemaregistry:8082" "sr_pb_test" 20 user
echo "make sure google/protobuf/source_context.proto is NOT in schema registry"
curl --silent 'http://schemaregistry:8082/subjects'; echo
# curl --silent --head -X GET 'http://schemaregistry:8082/subjects/google%2Fprotobuf%2Fsource_context.proto/versions' | grep 404
curl --silent 'http://schemaregistry:8082/subjects' | grep -v 'google/protobuf/source_context.proto'
risedev slt './e2e_test/schema_registry/pb.slt'
risedev slt './e2e_test/schema_registry/alter_sr.slt'

echo "--- Kill cluster"
risedev ci-kill

echo "--- e2e, ci-kafka-plus-pubsub, kafka and pubsub source"
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
Expand Down
80 changes: 0 additions & 80 deletions e2e_test/schema_registry/alter_sr.slt

This file was deleted.

50 changes: 0 additions & 50 deletions e2e_test/schema_registry/pb.slt

This file was deleted.

6 changes: 6 additions & 0 deletions e2e_test/source_inline/commands.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ set -e
if [ -n "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" ]; then
echo "Deleting all Kafka topics..."
rpk topic delete -r "*"
echo "Deleting all schema registry subjects"
rpk sr subject list | while read -r subject; do
echo "Deleting schema registry subject: $subject"
rpk sr subject delete "$subject"
rpk sr subject delete "$subject" --permanent
done
else
echo "No Kafka to clean."
fi
Expand Down
70 changes: 70 additions & 0 deletions e2e_test/source_inline/kafka/avro/alter_source.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
control substitution on

# https://github.com/risingwavelabs/risingwave/issues/16486

# cleanup
system ok
rpk topic delete 'avro_alter_source_test' || true; \\
(rpk sr subject delete 'avro_alter_source_test-value' && rpk sr subject delete 'avro_alter_source_test-value' --permanent) || true;

# create topic and sr subject
system ok
rpk topic create 'avro_alter_source_test'

system ok
echo '{"type":"record","name":"Root","fields":[{"name":"foo","type":"string"}]}' | jq '{"schema": tojson}' \\
| curl -X POST -H 'content-type:application/json' -d @- '${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/avro_alter_source_test-value/versions'

statement ok
create source s
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'avro_alter_source_test'
)
FORMAT PLAIN ENCODE AVRO (
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}'
);

# create a new version of schema and produce a message
system ok
echo '{"type":"record","name":"Root","fields":[{"name":"bar","type":"int","default":0},{"name":"foo","type":"string"}]}' | jq '{"schema": tojson}' \\
| curl -X POST -H 'content-type:application/json' -d @- '${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/avro_alter_source_test-value/versions'

system ok
echo '{"foo":"ABC", "bar":1}' | rpk topic produce --schema-id=topic avro_alter_source_test

query ?
select * from s
----
ABC

statement error
alter source s format plain encode json;
----
db error: ERROR: Failed to run the query

Caused by:
Feature is not yet implemented: the original definition is FORMAT Plain ENCODE Avro, and altering them is not supported yet
No tracking issue yet. Feel free to submit a feature request at https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Ffeature&template=feature_request.yml


statement ok
alter source s format plain encode avro (schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}');

query ??
select * from s
----
ABC 1

statement ok
create materialized view mv as select * from s;

sleep 2s

query ??
select * from mv
----
ABC 1

statement ok
drop source s cascade;
91 changes: 91 additions & 0 deletions e2e_test/source_inline/kafka/protobuf/alter_source.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
control substitution on

system ok
rpk topic delete sr_pb_test || true; \\
(rpk sr subject delete 'sr_pb_test-value' && rpk sr subject delete 'sr_pb_test-value' --permanent) || true;

system ok
python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "sr_pb_test" 20 user

statement ok
CREATE SOURCE src_user
INCLUDE timestamp -- include explicitly here to test a bug found in https://github.com/risingwavelabs/risingwave/pull/17293
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'sr_pb_test',
scan.startup.mode = 'earliest'
)
FORMAT PLAIN ENCODE PROTOBUF(
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}',
message = 'test.User'
);

statement ok
CREATE MATERIALIZED VIEW mv_user AS SELECT * FROM src_user;

statement ok
CREATE TABLE t_user WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'sr_pb_test',
scan.startup.mode = 'earliest'
)
FORMAT PLAIN ENCODE PROTOBUF(
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}',
message = 'test.User'
);

statement error
SELECT age FROM mv_user;

statement error
SELECT age FROM t_user;

# Push more events with extended fields
system ok
python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "sr_pb_test" 5 user_with_more_fields

sleep 5s

# Refresh source schema
statement ok
ALTER SOURCE src_user REFRESH SCHEMA;

statement ok
CREATE MATERIALIZED VIEW mv_user_more AS SELECT * FROM src_user;

# Refresh table schema. It consume new data before refresh, so the new fields are NULLs
statement ok
ALTER TABLE t_user REFRESH SCHEMA;

query ????
SELECT COUNT(*), MAX(age), MIN(age), SUM(age) FROM mv_user_more;
----
25 104 0 510

query ????
SELECT COUNT(*), MAX(age), MIN(age), SUM(age) FROM t_user;
----
25 NULL NULL NULL

# Push more events with extended fields
system ok
python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "sr_pb_test" 5 user_with_more_fields

sleep 5s

query ????
SELECT COUNT(*), MAX(age), MIN(age), SUM(age) FROM t_user;
----
30 104 100 510

statement ok
DROP MATERIALIZED VIEW mv_user_more;

statement ok
DROP TABLE t_user;

statement ok
DROP MATERIALIZED VIEW mv_user;

statement ok
DROP SOURCE src_user;
58 changes: 58 additions & 0 deletions e2e_test/source_inline/kafka/protobuf/basic.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
control substitution on

system ok
rpk topic delete sr_pb_test || true; \\
(rpk sr subject delete 'sr_pb_test-value' && rpk sr subject delete 'sr_pb_test-value' --permanent) || true;

system ok
python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "sr_pb_test" 20 user

# make sure google/protobuf/source_context.proto is NOT in schema registry
system ok
curl --silent '${RISEDEV_SCHEMA_REGISTRY_URL}' | grep -v 'google/protobuf/source_context.proto'

# Create a table.
statement ok
create table sr_pb_test with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'sr_pb_test',
scan.startup.mode = 'earliest')
FORMAT plain ENCODE protobuf(
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}',
message = 'test.User'
);

# for multiple schema registry nodes
statement ok
create table sr_pb_test_bk with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'sr_pb_test',
scan.startup.mode = 'earliest')
FORMAT plain ENCODE protobuf(
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL},${RISEDEV_SCHEMA_REGISTRY_URL}',
message = 'test.User'
);

# Wait for source
sleep 2s

# Flush into storage
statement ok
flush;

query I
select count(*) from sr_pb_test;
----
20

query IT
select min(id), max(id), max((sc).file_name) from sr_pb_test;
----
0 19 source/context_019.proto


statement ok
drop table sr_pb_test;

statement ok
drop table sr_pb_test_bk;
Loading

0 comments on commit 9aff870

Please sign in to comment.