Skip to content

Commit

Permalink
test(ticdc): add run pulsar cluster in integration shell script (#10654)
Browse files Browse the repository at this point in the history
ref #10653
  • Loading branch information
sdojjy authored Mar 11, 2024
1 parent d19833e commit 8af761c
Show file tree
Hide file tree
Showing 55 changed files with 359 additions and 52 deletions.
151 changes: 151 additions & 0 deletions tests/integration_tests/_utils/run_pulsar_cluster
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
#!/bin/bash

# parameter 1: work directory
# parameter 2: cluster_type, mtls or oauth, otherwise use default configuration to start pulsar cluster

set -eux

echo "[$(date)] <<<<<< START pulsar cluster in $TEST_NAME case >>>>>>"
workdir=$1
cluster_type=$2

cd $workdir

DEFAULT_PULSAR_HOME="/usr/local/pulsar"
# use PULSAR_HOME if it is set, otherwise use default pulsar home
pulsar_dir=${PULSAR_HOME:-$DEFAULT_PULSAR_HOME}

mtls_conf=$(
cat <<-EOF
authenticationEnabled=true
authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderTls
brokerClientTlsEnabled=true
brokerClientTrustCertsFilePath=${workdir}/ca.cert.pem
brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationTls
brokerClientAuthenticationParameters={"tlsCertFile":"${workdir}/broker_client.cert.pem","tlsKeyFile":"${workdir}/broker_client.key-pk8.pem"}
brokerServicePortTls=6651
webServicePortTls=8443
tlsTrustCertsFilePath=${workdir}/ca.cert.pem
tlsCertificateFilePath=${workdir}/server.cert.pem
tlsKeyFilePath=${workdir}/server.key-pk8.pem
tlsRequireTrustedClientCertOnConnect=true
tlsAllowInsecureConnection=false
tlsCertRefreshCheckDurationSec=300
EOF
)

normal_client_conf=$(
cat <<-EOF
webServiceUrl=http://localhost:8080/
brokerServiceUrl=pulsar://localhost:6650/
EOF
)

mtls_client_conf=$(
cat <<-EOF
webServiceUrl=https://localhost:8443/
brokerServiceUrl=pulsar+ssl://localhost:6651/
authPlugin=org.apache.pulsar.client.impl.auth.AuthenticationTls
authParams=tlsCertFile:${workdir}/broker_client.cert.pem,tlsKeyFile:${workdir}/broker_client.key-pk8.pem
tlsTrustCertsFilePath=${workdir}/ca.cert.pem
EOF
)

oauth_conf=$(
cat <<-EOF
authenticationEnabled=true
authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken
brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
brokerClientAuthenticationParameters={"privateKey":"file://${workdir}/privateKey","audience":"https://dev-kt-aa9ne.us.auth0.com/api/v2/","issuerUrl":"https://dev-kt-aa9ne.us.auth0.com"}
tokenSecretKey=file://${workdir}/secret.key
EOF
)

cert_server_conf=$(
cat <<-'EOF'
[ req ]
default_bits = 2048
prompt = no
default_md = sha256
distinguished_name = dn
[ v3_ext ]
authorityKeyIdentifier=keyid,issuer:always
basicConstraints=CA:FALSE
keyUsage=critical, digitalSignature, keyEncipherment
extendedKeyUsage=serverAuth
subjectAltName=@alt_names
[ dn ]
CN = server
[ alt_names ]
DNS.1 = localhost
IP.1 = 127.0.0.1
EOF
)

function gen_mtls_config() {
openssl genrsa -out ca.key.pem 2048
openssl req -x509 -new -nodes -key ca.key.pem -subj "/CN=CARoot" -days 365 -out ca.cert.pem
openssl genrsa -out server.key.pem 2048
openssl pkcs8 -topk8 -inform PEM -outform PEM -in server.key.pem -out server.key-pk8.pem -nocrypt
echo "$cert_server_conf" >server.conf
openssl req -new -config server.conf -key server.key.pem -out server.csr.pem -sha256
openssl x509 -req -in server.csr.pem -CA ca.cert.pem -CAkey ca.key.pem -CAcreateserial -out server.cert.pem -days 365 -extensions v3_ext -extfile server.conf -sha256
openssl genrsa -out broker_client.key.pem 2048
openssl pkcs8 -topk8 -inform PEM -outform PEM -in broker_client.key.pem -out broker_client.key-pk8.pem -nocrypt
openssl req -new -subj "/CN=broker_client" -key broker_client.key.pem -out broker_client.csr.pem -sha256
openssl x509 -req -in broker_client.csr.pem -CA ca.cert.pem -CAkey ca.key.pem -CAcreateserial -out broker_client.cert.pem -days 365 -sha256
echo "$mtls_conf" >>${workdir}/pulsar_standalone.conf
echo "$mtls_client_conf" >${pulsar_dir}/conf/client.conf
}

function gen_oauth_config() {
echo "$oauth_conf" >>${workdir}/pulsar_standalone.conf
}

echo "$normal_client_conf" >${pulsar_dir}/conf/client.conf
# copy the origin config to work directory
cp $pulsar_dir/conf/standalone.conf ${workdir}/pulsar_standalone.conf
pulsar_port=6650
if [ "$cluster_type" == "mtls" ]; then
pulsar_port=6651
gen_mtls_config
elif [ "$cluster_type" == "oauth" ]; then
gen_oauth_config
else
echo "no cluster type specified, using default configuration."
fi

echo "[$(date)] <<<<<< START pulsar cluster in $cluster_type mode in $TEST_NAME case >>>>>>"
$pulsar_dir/bin/pulsar standalone --config $workdir/pulsar_standalone.conf -nfw --metadata-dir $workdir/pulsar-metadata --bookkeeper-dir $workdir/pulsar-bookie >>$workdir/pulsar_stdout.log 2>&1 &
echo "Waiting for pulsar port to be ready..."
i=0
while ! nc -z localhost "$pulsar_port"; do
i=$((i + 1))
if [ "$i" -gt 10 ]; then
cat $workdir/pulsar_stdout.log
echo 'Failed to start pulsar'
exit 1
fi
sleep 2
done

echo "Waiting for pulsar namespace to be ready..."
i=0
while ! $pulsar_dir/bin/pulsar-admin namespaces list public; do
i=$((i + 1))
if [ "$i" -gt 10 ]; then
cat $workdir/pulsar_stdout.log
echo 'Failed to list pulsar namespace'
exit 1
fi
sleep 2
done
echo "[$(date)] <<<<<< pulsar is ready >>>>>>"
2 changes: 2 additions & 0 deletions tests/integration_tests/_utils/stop_tidb_cluster
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,5 @@ kill -9 $(lsof -i tcp:${DOWN_TIKV_PORT} -t 2>/dev/null) &>/dev/null || true
kill -9 $(lsof -i tcp:${DOWN_TIKV_STATUS_PORT} -t 2>/dev/null) &>/dev/null || true
kill -9 $(lsof -i tcp:9500 -t 2>/dev/null) &>/dev/null || true
kill -9 $(lsof -i tcp:17000 -t 2>/dev/null) &>/dev/null || true
kill -9 $(lsof -i tcp:6650 -t 2>/dev/null) &>/dev/null || true
kill -9 $(lsof -i tcp:6651 -t 2>/dev/null) &>/dev/null || true
5 changes: 4 additions & 1 deletion tests/integration_tests/autorandom/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ function run() {
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar) SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar)
run_pulsar_cluster $WORK_DIR normal
SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true"
;;
*) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;;
esac
cdc cli changefeed create --sink-uri="$SINK_URI"
Expand Down
5 changes: 4 additions & 1 deletion tests/integration_tests/batch_add_table/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ function run() {
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar) SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar)
run_pulsar_cluster $WORK_DIR normal
SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true"
;;
*) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;;
esac
run_cdc_cli changefeed create --sink-uri="$SINK_URI"
Expand Down
6 changes: 4 additions & 2 deletions tests/integration_tests/big_txn/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ function run() {
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar) SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar)
run_pulsar_cluster $WORK_DIR normal
SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true"
;;
*) SINK_URI="mysql://normal:123456@127.0.0.1:3306?transaction-atomicity=none" ;;
esac
run_cdc_cli changefeed create --sink-uri="$SINK_URI"
Expand All @@ -51,7 +54,6 @@ function run() {

check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

cleanup_process $CDC_BINARY
}

trap stop_tidb_cluster EXIT
Expand Down
1 change: 1 addition & 0 deletions tests/integration_tests/canal_json_basic/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ function run() {
fi

if [ "$SINK_TYPE" == "pulsar" ]; then
run_pulsar_cluster $WORK_DIR normal
SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true"
fi

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ function run() {
fi

if [ "$SINK_TYPE" == "pulsar" ]; then
run_pulsar_cluster $WORK_DIR normal
SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true"
fi

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ function run() {
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar) SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar)
run_pulsar_cluster $WORK_DIR normal
SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true"
;;
*) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;;
esac

Expand Down
5 changes: 4 additions & 1 deletion tests/integration_tests/cdc/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ function prepare() {
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar) SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar)
run_pulsar_cluster $WORK_DIR normal
SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true"
;;
*) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;;
esac
run_cdc_cli changefeed create --sink-uri="$SINK_URI"
Expand Down
5 changes: 4 additions & 1 deletion tests/integration_tests/cdc_server_tips/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ function try_to_run_cdc() {
case $SINK_TYPE in
kafka) SINK_URI="kafka+ssl://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-client-id=cdc_server_tips&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar) SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar)
run_pulsar_cluster $WORK_DIR normal
SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true"
;;
*) SINK_URI="mysql+ssl://normal:123456@127.0.0.1:3306/" ;;
esac
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"
Expand Down
5 changes: 4 additions & 1 deletion tests/integration_tests/changefeed_auto_stop/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ function run() {
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar) SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar)
run_pulsar_cluster $WORK_DIR normal
SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true"
;;
*) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;;
esac
changefeedid=$(cdc cli changefeed create --pd="http://${UP_PD_HOST_1}:${UP_PD_PORT_1}" --start-ts=$start_ts --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}')
Expand Down
5 changes: 4 additions & 1 deletion tests/integration_tests/changefeed_error/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ function run() {
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar) SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar)
run_pulsar_cluster $WORK_DIR normal
SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true"
;;
*) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;;
esac
changefeedid="changefeed-error"
Expand Down
5 changes: 4 additions & 1 deletion tests/integration_tests/changefeed_finish/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ function run() {
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar) SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar)
run_pulsar_cluster $WORK_DIR normal
SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true"
;;
*) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;;
esac

Expand Down
5 changes: 4 additions & 1 deletion tests/integration_tests/changefeed_pause_resume/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ function run() {
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar) SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar)
run_pulsar_cluster $WORK_DIR normal
SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true"
;;
*) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;;
esac

Expand Down
5 changes: 4 additions & 1 deletion tests/integration_tests/changefeed_reconstruct/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ function run() {
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar) SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar)
run_pulsar_cluster $WORK_DIR normal
SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true"
;;
*) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;;
esac

Expand Down
5 changes: 4 additions & 1 deletion tests/integration_tests/charset_gbk/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ function run() {
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar) SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar)
run_pulsar_cluster $WORK_DIR normal
SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true"
;;
*) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;;
esac
SINK_URI="mysql://normal:123456@127.0.0.1:3306/"
Expand Down
5 changes: 4 additions & 1 deletion tests/integration_tests/cli/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ function run() {
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar) SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar)
run_pulsar_cluster $WORK_DIR normal
SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true"
;;
*) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;;
esac

Expand Down
5 changes: 4 additions & 1 deletion tests/integration_tests/clustered_index/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ function run() {
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar) SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar)
run_pulsar_cluster $WORK_DIR normal
SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true"
;;
*) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;;
esac
cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"
Expand Down
5 changes: 4 additions & 1 deletion tests/integration_tests/common_1/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ function run() {
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar) SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar)
run_pulsar_cluster $WORK_DIR normal
SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true"
;;
*) SINK_URI="mysql://root@127.0.0.1:3306/" ;;
esac
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"
Expand Down
5 changes: 4 additions & 1 deletion tests/integration_tests/ddl_attributes/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ function run() {
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar) SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar)
run_pulsar_cluster $WORK_DIR normal
SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true"
;;
*) SINK_URI="mysql://root@127.0.0.1:3306/" ;;
esac

Expand Down
5 changes: 4 additions & 1 deletion tests/integration_tests/ddl_manager/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ function run() {
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar) SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar)
run_pulsar_cluster $WORK_DIR normal
SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true"
;;
*) SINK_URI="mysql://root@127.0.0.1:3306/" ;;
esac
changefeed_id="ddl-manager"
Expand Down
5 changes: 4 additions & 1 deletion tests/integration_tests/ddl_only_block_related_table/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,10 @@ function run() {
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar) SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar)
run_pulsar_cluster $WORK_DIR normal
SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true"
;;
*) SINK_URI="mysql://root@127.0.0.1:3306/" ;;
esac
changefeed_id="ddl-only-block-related-table"
Expand Down
Loading

0 comments on commit 8af761c

Please sign in to comment.