From 3c01da4217995605d92306f8fc084bf8f0a70904 Mon Sep 17 00:00:00 2001
From: zhangyuge1 <1728375982@qq.com>
Date: Sun, 24 Jul 2022 16:59:05 +0800
Subject: [PATCH 01/17] add jdbc e2e test
---
seatunnel-core/seatunnel-core-spark/pom.xml | 6 ++
.../seatunnel-flink-starter/pom.xml | 11 +++
.../seatunnel-flink-connector-v2-e2e/pom.xml | 7 ++
.../e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java | 79 +++++++++++++++++
.../flink/v2/jdbc/JdbcSourceToConsoleIT.java | 86 +++++++++++++++++++
.../resources/jdbc/fakesource_to_jdbc.conf | 62 +++++++++++++
.../resources/jdbc/jdbcsource_to_console.conf | 53 ++++++++++++
seatunnel-e2e/seatunnel-spark-e2e/pom.xml | 6 ++
.../e2e/spark/jdbc/FakeSourceToJdbcIT.java | 77 +++++++++++++++++
.../e2e/spark/jdbc/JdbcSourceToConsoleIT.java | 85 ++++++++++++++++++
.../resources/jdbc/fakesource_to_jdbc.conf | 60 +++++++++++++
.../resources/jdbc/jdbcsource_to_console.conf | 53 ++++++++++++
12 files changed, 585 insertions(+)
create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java
create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java
create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf
create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf
create mode 100644 seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java
create mode 100644 seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java
create mode 100644 seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf
create mode 100644 seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf
diff --git a/seatunnel-core/seatunnel-core-spark/pom.xml b/seatunnel-core/seatunnel-core-spark/pom.xml
index 5c1264b1850..21b4630230d 100644
--- a/seatunnel-core/seatunnel-core-spark/pom.xml
+++ b/seatunnel-core/seatunnel-core-spark/pom.xml
@@ -118,6 +118,12 @@
seatunnel-transform-spark-null-rate${project.version}
+
+
+ mysql
+ mysql-connector-java
+ compile
+
diff --git a/seatunnel-core/seatunnel-flink-starter/pom.xml b/seatunnel-core/seatunnel-flink-starter/pom.xml
index 6385b9c41f6..9253738069a 100644
--- a/seatunnel-core/seatunnel-flink-starter/pom.xml
+++ b/seatunnel-core/seatunnel-flink-starter/pom.xml
@@ -103,6 +103,17 @@
junit
+
+ mysql
+ mysql-connector-java
+ compile
+
+
+
+ org.apache.seatunnel
+ connector-jdbc
+ ${project.version}
+
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
index c494f80f886..2aeef2f22d5 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
@@ -37,6 +37,13 @@
testcontainers
+
+ org.testcontainers
+ mysql
+ 1.17.3
+ test
+
+
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java
new file mode 100644
index 00000000000..fe244fda2f3
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java
@@ -0,0 +1,79 @@
+package org.apache.seatunnel.e2e.flink.v2.jdbc;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import com.google.common.collect.Lists;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.stream.Stream;
+
+public class FakeSourceToJdbcIT extends FlinkContainer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceToJdbcIT.class);
+ private MySQLContainer> mysql;
+ private Connection connection;
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ @Before
+ public void startMysqlContainer() throws InterruptedException, ClassNotFoundException, SQLException {
+ mysql = new MySQLContainer<>(DockerImageName.parse("mysql:8.0"))
+ .withNetwork(NETWORK)
+ .withNetworkAliases("jdbc")
+ .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+ mysql.setPortBindings(Lists.newArrayList("3306:3306"));
+ Startables.deepStart(Stream.of(mysql)).join();
+ LOGGER.info("Jdbc container started");
+ Thread.sleep(5000L);
+ Class.forName(mysql.getDriverClassName());
+ connection = DriverManager.getConnection(mysql.getJdbcUrl(), mysql.getUsername(), mysql.getPassword());
+ initializeJdbcTable();
+ }
+
+ private void initializeJdbcTable() throws SQLException {
+ Statement statement = connection.createStatement();
+ String sql = "CREATE TABLE test (\n" +
+ " name varchar(255) NOT NULL,\n" +
+ " age int NOT NULL\n" +
+ ")";
+ statement.executeUpdate(sql);
+ statement.close();
+ }
+
+ @Test
+ public void testFakeSourceToJdbcSink() throws SQLException, IOException, InterruptedException {
+ Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/fakesource_to_jdbc.conf");
+ Assert.assertEquals(0, execResult.getExitCode());
+ // query result
+ String sql = "select * from test";
+ Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery(sql);
+ List result = Lists.newArrayList();
+ while (resultSet.next()) {
+ result.add(resultSet.getString("name"));
+ }
+ Assert.assertFalse(result.isEmpty());
+ }
+
+ @After
+ public void closeMysqlContainer() {
+ if (mysql != null) {
+ mysql.stop();
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java
new file mode 100644
index 00000000000..c19d76570a1
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java
@@ -0,0 +1,86 @@
+package org.apache.seatunnel.e2e.flink.v2.jdbc;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import com.google.common.collect.Lists;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.stream.Stream;
+
+public class JdbcSourceToConsoleIT extends FlinkContainer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSourceToConsoleIT.class);
+ private MySQLContainer> mysql;
+ private Connection connection;
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ @Before
+ public void startMysqlContainer() throws InterruptedException, ClassNotFoundException, SQLException {
+ mysql = new MySQLContainer<>(DockerImageName.parse("mysql:8.0"))
+ .withNetwork(NETWORK)
+ .withNetworkAliases("jdbc")
+ .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+ mysql.setPortBindings(Lists.newArrayList("3306:3306"));
+ Startables.deepStart(Stream.of(mysql)).join();
+ LOGGER.info("Jdbc container started");
+ // wait for clickhouse fully start
+ Thread.sleep(5000L);
+ Class.forName(mysql.getDriverClassName());
+ connection = DriverManager.getConnection(mysql.getJdbcUrl(), mysql.getUsername(), mysql.getPassword());
+ initializeJdbcTable();
+ batchInsertData();
+ }
+
+ private void initializeJdbcTable() throws SQLException {
+ Statement statement = connection.createStatement();
+ String sql = "CREATE TABLE test (\n" +
+ " name varchar(255) NOT NULL,\n" +
+ " age int NOT NULL\n" +
+ ")";
+ statement.executeUpdate(sql);
+ statement.close();
+ }
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ private void batchInsertData() throws SQLException {
+ String sql = "insert into test(name,age) values(?,?)";
+ connection.setAutoCommit(false);
+ PreparedStatement preparedStatement = connection.prepareStatement(sql);
+ for (int i = 0; i < 10; i++) {
+ preparedStatement.setString(1, "Mike");
+ preparedStatement.setInt(2, 20);
+ preparedStatement.addBatch();
+ }
+ preparedStatement.executeBatch();
+ connection.commit();
+ connection.close();
+ }
+
+ @Test
+ public void testFakeSourceToJdbcSink() throws SQLException, IOException, InterruptedException {
+ Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/jdbcsource_to_console.conf");
+ Assert.assertEquals(0, execResult.getExitCode());
+ }
+
+ @After
+ public void closeMysqlContainer() {
+ if (mysql != null) {
+ mysql.stop();
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf
new file mode 100644
index 00000000000..5a543045d99
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
+ FakeSource {
+ result_table_name = "fake"
+ field_name = "name,age"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+ sql {
+ sql = "select name,age from fake"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+ Jdbc {
+ source_table_name = fake
+ driver = com.mysql.cj.jdbc.Driver
+ url = "jdbc:mysql://jdbc:3306/test"
+ user = test
+ password = test
+ query = "insert into test(name,age) values(?,?)"
+ batch_size = 2
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf
new file mode 100644
index 00000000000..77acadb24b5
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
+ Jdbc {
+ driver = com.mysql.cj.jdbc.Driver
+ url = "jdbc:mysql://jdbc:3306/test"
+ user = test
+ password = test
+ query = "select * from test"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+
+ # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+ Console {}
+ # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-e2e/pom.xml
index e6a57972901..9b18f701f3e 100644
--- a/seatunnel-e2e/seatunnel-spark-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-spark-e2e/pom.xml
@@ -41,6 +41,12 @@
org.testcontainerstestcontainers
+
+ org.testcontainers
+ mysql
+ 1.17.3
+ test
+
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java b/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java
new file mode 100644
index 00000000000..83f1d85b1e8
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java
@@ -0,0 +1,77 @@
+package org.apache.seatunnel.e2e.spark.jdbc;
+
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import com.google.common.collect.Lists;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.stream.Stream;
+
+public class FakeSourceToJdbcIT extends SparkContainer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceToJdbcIT.class);
+ private MySQLContainer> mysql;
+ private Connection connection;
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ @Before
+ public void startMysqlContainer() throws InterruptedException, ClassNotFoundException, SQLException {
+ mysql = new MySQLContainer<>(DockerImageName.parse("mysql:8.0"))
+ .withNetwork(NETWORK)
+ .withNetworkAliases("jdbc")
+ .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+ mysql.setPortBindings(Lists.newArrayList("33306:3306"));
+ Startables.deepStart(Stream.of(mysql)).join();
+ LOGGER.info("Jdbc container started");
+ Thread.sleep(5000L);
+ Class.forName(mysql.getDriverClassName());
+ connection = DriverManager.getConnection(mysql.getJdbcUrl(), mysql.getUsername(), mysql.getPassword());
+ initializeJdbcTable();
+ }
+
+ private void initializeJdbcTable() throws SQLException {
+ Statement statement = connection.createStatement();
+ String sql = "CREATE TABLE test (\n" +
+ " name varchar(255) NOT NULL\n" +
+ ")";
+ statement.executeUpdate(sql);
+ statement.close();
+ }
+
+ @Test
+ public void testFakeSourceToJdbcSink() throws SQLException, IOException, InterruptedException {
+ Container.ExecResult execResult = executeSeaTunnelSparkJob("/jdbc/fakesource_to_jdbc.conf");
+ Assert.assertEquals(0, execResult.getExitCode());
+ String sql = "select * from test";
+ Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery(sql);
+ List result = Lists.newArrayList();
+ while (resultSet.next()) {
+ result.add(resultSet.getString("name"));
+ }
+ Assert.assertFalse(result.isEmpty());
+ }
+
+ @After
+ public void closeMysqlContainer() {
+ if (mysql != null) {
+ mysql.stop();
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java b/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java
new file mode 100644
index 00000000000..c2942d704b7
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java
@@ -0,0 +1,85 @@
+package org.apache.seatunnel.e2e.spark.jdbc;
+
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import com.google.common.collect.Lists;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.stream.Stream;
+
+public class JdbcSourceToConsoleIT extends SparkContainer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSourceToConsoleIT.class);
+ private MySQLContainer> mysql;
+ private Connection connection;
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ @Before
+ public void startMysqlContainer() throws InterruptedException, ClassNotFoundException, SQLException {
+ mysql = new MySQLContainer<>(DockerImageName.parse("mysql:8.0"))
+ .withNetwork(NETWORK)
+ .withNetworkAliases("jdbc")
+ .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+ mysql.setPortBindings(Lists.newArrayList("33306:3306"));
+ Startables.deepStart(Stream.of(mysql)).join();
+ LOGGER.info("Jdbc container started");
+ Thread.sleep(5000L);
+ Class.forName(mysql.getDriverClassName());
+ connection = DriverManager.getConnection(mysql.getJdbcUrl(), mysql.getUsername(), mysql.getPassword());
+ initializeJdbcTable();
+ batchInsertData();
+ }
+
+ private void initializeJdbcTable() throws SQLException {
+ Statement statement = connection.createStatement();
+ String sql = "CREATE TABLE test (\n" +
+ " name varchar(255) NOT NULL,\n" +
+ " age int NOT NULL\n" +
+ ")";
+ statement.executeUpdate(sql);
+ statement.close();
+ }
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ private void batchInsertData() throws SQLException {
+ String sql = "insert into test(name,age) values(?,?)";
+ connection.setAutoCommit(false);
+ PreparedStatement preparedStatement = connection.prepareStatement(sql);
+ for (int i = 0; i < 10; i++) {
+ preparedStatement.setString(1, "Mike");
+ preparedStatement.setInt(2, 20);
+ preparedStatement.addBatch();
+ }
+ preparedStatement.executeBatch();
+ connection.commit();
+ connection.close();
+ }
+
+ @Test
+ public void testFakeSourceToJdbcSink() throws SQLException, IOException, InterruptedException {
+ Container.ExecResult execResult = executeSeaTunnelSparkJob("/jdbc/jdbcsource_to_console.conf");
+ Assert.assertEquals(0, execResult.getExitCode());
+ }
+
+ @After
+ public void closeMysqlContainer() {
+ if (mysql != null) {
+ mysql.stop();
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf b/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf
new file mode 100644
index 00000000000..598efd7b8b2
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
+ Fake {
+ result_table_name = "fake"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+ # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+ jdbc {
+ driver = com.mysql.cj.jdbc.Driver
+ saveMode = "update",
+ truncate = "true",
+ url = "jdbc:mysql://jdbc:3306/test",
+ user = "test",
+ password = "test",
+ dbTable = "fake",
+ customUpdateStmt = "insert into test(name) values(?)"
+ jdbc.connect_timeout = 10000
+ jdbc.socket_timeout = 10000
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf b/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf
new file mode 100644
index 00000000000..1ad856e0135
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
+ jdbc {
+ driver = com.mysql.cj.jdbc.Driver
+ url = "jdbc:mysql://jdbc:3306/test"
+ table = "test"
+ result_table_name = "test_log"
+ user = "test"
+ password = "test"
+ }
+ # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+ # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+ Console {}
+
+ # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+}
\ No newline at end of file
From 96f7667cf6f29fa0c66930be3d4bed388d3f6080 Mon Sep 17 00:00:00 2001
From: zhangyuge1 <1728375982@qq.com>
Date: Mon, 25 Jul 2022 22:37:48 +0800
Subject: [PATCH 02/17] add jdbc(postgresql) e2e test
---
.idea/vcs.xml | 28 +++-------
seatunnel-core/seatunnel-core-spark/pom.xml | 4 +-
.../seatunnel-flink-starter/pom.xml | 12 ++---
.../seatunnel-flink-connector-v2-e2e/pom.xml | 8 ++-
.../e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java | 48 +++++++++++------
.../flink/v2/jdbc/JdbcSourceToConsoleIT.java | 52 ++++++++++++-------
.../resources/jdbc/fakesource_to_jdbc.conf | 11 ++--
.../resources/jdbc/jdbcsource_to_console.conf | 6 +--
seatunnel-e2e/seatunnel-spark-e2e/pom.xml | 7 ++-
.../e2e/spark/jdbc/FakeSourceToJdbcIT.java | 42 ++++++++++-----
.../e2e/spark/jdbc/JdbcSourceToConsoleIT.java | 43 ++++++++++-----
.../resources/jdbc/fakesource_to_jdbc.conf | 4 +-
.../resources/jdbc/jdbcsource_to_console.conf | 4 +-
13 files changed, 162 insertions(+), 107 deletions(-)
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
index 59c36d8ab14..e5899791808 100644
--- a/.idea/vcs.xml
+++ b/.idea/vcs.xml
@@ -1,32 +1,16 @@
-
-
-
-
-
+
-
+
+
+
+
\ No newline at end of file
diff --git a/seatunnel-core/seatunnel-core-spark/pom.xml b/seatunnel-core/seatunnel-core-spark/pom.xml
index 21b4630230d..e11948b5116 100644
--- a/seatunnel-core/seatunnel-core-spark/pom.xml
+++ b/seatunnel-core/seatunnel-core-spark/pom.xml
@@ -120,8 +120,8 @@
- mysql
- mysql-connector-java
+ org.postgresql
+ postgresqlcompile
diff --git a/seatunnel-core/seatunnel-flink-starter/pom.xml b/seatunnel-core/seatunnel-flink-starter/pom.xml
index 9253738069a..df48449e987 100644
--- a/seatunnel-core/seatunnel-flink-starter/pom.xml
+++ b/seatunnel-core/seatunnel-flink-starter/pom.xml
@@ -103,17 +103,17 @@
junit
-
- mysql
- mysql-connector-java
- compile
-
-
org.apache.seatunnelconnector-jdbc${project.version}
+
+
+ org.postgresql
+ postgresql
+ compile
+
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
index 2aeef2f22d5..454a41c9562 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
@@ -39,11 +39,17 @@
org.testcontainers
- mysql
+ postgresql1.17.3test
+
+ org.postgresql
+ postgresql
+ test
+
+
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java
index fe244fda2f3..dc928acc31e 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.seatunnel.e2e.flink.v2.jdbc;
import org.apache.seatunnel.e2e.flink.FlinkContainer;
@@ -10,7 +27,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
-import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;
@@ -26,32 +43,30 @@
public class FakeSourceToJdbcIT extends FlinkContainer {
private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceToJdbcIT.class);
- private MySQLContainer> mysql;
+ private PostgreSQLContainer> psl;
private Connection connection;
@SuppressWarnings("checkstyle:MagicNumber")
@Before
- public void startMysqlContainer() throws InterruptedException, ClassNotFoundException, SQLException {
- mysql = new MySQLContainer<>(DockerImageName.parse("mysql:8.0"))
+ public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoundException, SQLException {
+ psl = new PostgreSQLContainer<>(DockerImageName.parse("postgres:alpine3.16"))
.withNetwork(NETWORK)
- .withNetworkAliases("jdbc")
+ .withNetworkAliases("postgresql")
.withLogConsumer(new Slf4jLogConsumer(LOGGER));
- mysql.setPortBindings(Lists.newArrayList("3306:3306"));
- Startables.deepStart(Stream.of(mysql)).join();
- LOGGER.info("Jdbc container started");
+ Startables.deepStart(Stream.of(psl)).join();
+ LOGGER.info("PostgreSql container started");
Thread.sleep(5000L);
- Class.forName(mysql.getDriverClassName());
- connection = DriverManager.getConnection(mysql.getJdbcUrl(), mysql.getUsername(), mysql.getPassword());
+ Class.forName(psl.getDriverClassName());
+ connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword());
initializeJdbcTable();
}
private void initializeJdbcTable() throws SQLException {
Statement statement = connection.createStatement();
String sql = "CREATE TABLE test (\n" +
- " name varchar(255) NOT NULL,\n" +
- " age int NOT NULL\n" +
+ " name varchar(255) NOT NULL\n" +
")";
- statement.executeUpdate(sql);
+ statement.execute(sql);
statement.close();
}
@@ -68,12 +83,13 @@ public void testFakeSourceToJdbcSink() throws SQLException, IOException, Interru
result.add(resultSet.getString("name"));
}
Assert.assertFalse(result.isEmpty());
+ connection.close();
}
@After
- public void closeMysqlContainer() {
- if (mysql != null) {
- mysql.stop();
+ public void closeClickHouseContainer() {
+ if (psl != null) {
+ psl.stop();
}
}
}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java
index c19d76570a1..9151f001f52 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java
@@ -1,8 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.seatunnel.e2e.flink.v2.jdbc;
import org.apache.seatunnel.e2e.flink.FlinkContainer;
-import com.google.common.collect.Lists;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -10,7 +26,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
-import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;
@@ -25,23 +41,21 @@
public class JdbcSourceToConsoleIT extends FlinkContainer {
private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSourceToConsoleIT.class);
- private MySQLContainer> mysql;
+ private PostgreSQLContainer> psl;
private Connection connection;
@SuppressWarnings("checkstyle:MagicNumber")
@Before
- public void startMysqlContainer() throws InterruptedException, ClassNotFoundException, SQLException {
- mysql = new MySQLContainer<>(DockerImageName.parse("mysql:8.0"))
+ public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoundException, SQLException {
+ psl = new PostgreSQLContainer<>(DockerImageName.parse("postgres:alpine3.16"))
.withNetwork(NETWORK)
- .withNetworkAliases("jdbc")
+ .withNetworkAliases("postgresql")
.withLogConsumer(new Slf4jLogConsumer(LOGGER));
- mysql.setPortBindings(Lists.newArrayList("3306:3306"));
- Startables.deepStart(Stream.of(mysql)).join();
- LOGGER.info("Jdbc container started");
- // wait for clickhouse fully start
+ Startables.deepStart(Stream.of(psl)).join();
+ LOGGER.info("PostgreSql container started");
Thread.sleep(5000L);
- Class.forName(mysql.getDriverClassName());
- connection = DriverManager.getConnection(mysql.getJdbcUrl(), mysql.getUsername(), mysql.getPassword());
+ Class.forName(psl.getDriverClassName());
+ connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword());
initializeJdbcTable();
batchInsertData();
}
@@ -49,21 +63,19 @@ public void startMysqlContainer() throws InterruptedException, ClassNotFoundExce
private void initializeJdbcTable() throws SQLException {
Statement statement = connection.createStatement();
String sql = "CREATE TABLE test (\n" +
- " name varchar(255) NOT NULL,\n" +
- " age int NOT NULL\n" +
+ " name varchar(255) NOT NULL\n" +
")";
- statement.executeUpdate(sql);
+ statement.execute(sql);
statement.close();
}
@SuppressWarnings("checkstyle:MagicNumber")
private void batchInsertData() throws SQLException {
- String sql = "insert into test(name,age) values(?,?)";
+ String sql = "insert into test(name) values(?)";
connection.setAutoCommit(false);
PreparedStatement preparedStatement = connection.prepareStatement(sql);
for (int i = 0; i < 10; i++) {
preparedStatement.setString(1, "Mike");
- preparedStatement.setInt(2, 20);
preparedStatement.addBatch();
}
preparedStatement.executeBatch();
@@ -78,9 +90,9 @@ public void testFakeSourceToJdbcSink() throws SQLException, IOException, Interru
}
@After
- public void closeMysqlContainer() {
- if (mysql != null) {
- mysql.stop();
+ public void closePostgreSqlContainer() {
+ if (psl != null) {
+ psl.stop();
}
}
}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf
index 5a543045d99..9640e19c228 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf
@@ -30,7 +30,7 @@ source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
result_table_name = "fake"
- field_name = "name,age"
+ field_name = "name"
}
# If you would like to get more information about how to configure seatunnel and see full list of source plugins,
@@ -39,7 +39,7 @@ source {
transform {
sql {
- sql = "select name,age from fake"
+ sql = "select name from fake"
}
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
@@ -49,12 +49,11 @@ transform {
sink {
Jdbc {
source_table_name = fake
- driver = com.mysql.cj.jdbc.Driver
- url = "jdbc:mysql://jdbc:3306/test"
+ driver = org.postgresql.Driver
+ url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
user = test
password = test
- query = "insert into test(name,age) values(?,?)"
- batch_size = 2
+ query = "insert into test(name) values(?)"
}
# If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf
index 77acadb24b5..6862abc04c2 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf
@@ -29,10 +29,10 @@ env {
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
Jdbc {
- driver = com.mysql.cj.jdbc.Driver
- url = "jdbc:mysql://jdbc:3306/test"
+ driver = org.postgresql.Driver
+ url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
user = test
- password = test
+ password = "test"
query = "select * from test"
}
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-e2e/pom.xml
index 9b18f701f3e..85a51616522 100644
--- a/seatunnel-e2e/seatunnel-spark-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-spark-e2e/pom.xml
@@ -43,10 +43,15 @@
org.testcontainers
- mysql
+ postgresql1.17.3test
+
+ org.postgresql
+ postgresql
+ test
+
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java b/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java
index 83f1d85b1e8..e37c9414f49 100644
--- a/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java
+++ b/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.seatunnel.e2e.spark.jdbc;
import org.apache.seatunnel.e2e.spark.SparkContainer;
@@ -10,7 +27,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
-import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;
@@ -26,22 +43,21 @@
public class FakeSourceToJdbcIT extends SparkContainer {
private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceToJdbcIT.class);
- private MySQLContainer> mysql;
+ private PostgreSQLContainer> psl;
private Connection connection;
@SuppressWarnings("checkstyle:MagicNumber")
@Before
- public void startMysqlContainer() throws InterruptedException, ClassNotFoundException, SQLException {
- mysql = new MySQLContainer<>(DockerImageName.parse("mysql:8.0"))
+ public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoundException, SQLException {
+ psl = new PostgreSQLContainer<>(DockerImageName.parse("postgres:alpine3.16"))
.withNetwork(NETWORK)
- .withNetworkAliases("jdbc")
+ .withNetworkAliases("postgresql")
.withLogConsumer(new Slf4jLogConsumer(LOGGER));
- mysql.setPortBindings(Lists.newArrayList("33306:3306"));
- Startables.deepStart(Stream.of(mysql)).join();
- LOGGER.info("Jdbc container started");
+ Startables.deepStart(Stream.of(psl)).join();
+ LOGGER.info("PostgreSql container started");
Thread.sleep(5000L);
- Class.forName(mysql.getDriverClassName());
- connection = DriverManager.getConnection(mysql.getJdbcUrl(), mysql.getUsername(), mysql.getPassword());
+ Class.forName(psl.getDriverClassName());
+ connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword());
initializeJdbcTable();
}
@@ -69,9 +85,9 @@ public void testFakeSourceToJdbcSink() throws SQLException, IOException, Interru
}
@After
- public void closeMysqlContainer() {
- if (mysql != null) {
- mysql.stop();
+ public void closePostgreSqlContainer() {
+ if (psl != null) {
+ psl.stop();
}
}
}
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java b/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java
index c2942d704b7..4ed014e5223 100644
--- a/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java
+++ b/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.seatunnel.e2e.spark.jdbc;
import org.apache.seatunnel.e2e.spark.SparkContainer;
@@ -10,7 +27,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
-import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;
@@ -25,22 +42,22 @@
public class JdbcSourceToConsoleIT extends SparkContainer {
private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSourceToConsoleIT.class);
- private MySQLContainer> mysql;
+ private PostgreSQLContainer> psl;
private Connection connection;
@SuppressWarnings("checkstyle:MagicNumber")
@Before
- public void startMysqlContainer() throws InterruptedException, ClassNotFoundException, SQLException {
- mysql = new MySQLContainer<>(DockerImageName.parse("mysql:8.0"))
+ public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoundException, SQLException {
+ psl = new PostgreSQLContainer<>(DockerImageName.parse("postgres:alpine3.16"))
.withNetwork(NETWORK)
- .withNetworkAliases("jdbc")
+ .withNetworkAliases("postgresql")
.withLogConsumer(new Slf4jLogConsumer(LOGGER));
- mysql.setPortBindings(Lists.newArrayList("33306:3306"));
- Startables.deepStart(Stream.of(mysql)).join();
- LOGGER.info("Jdbc container started");
+ psl.setPortBindings(Lists.newArrayList("33306:3306"));
+ Startables.deepStart(Stream.of(psl)).join();
+ LOGGER.info("PostgreSql container started");
Thread.sleep(5000L);
- Class.forName(mysql.getDriverClassName());
- connection = DriverManager.getConnection(mysql.getJdbcUrl(), mysql.getUsername(), mysql.getPassword());
+ Class.forName(psl.getDriverClassName());
+ connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword());
initializeJdbcTable();
batchInsertData();
}
@@ -77,9 +94,9 @@ public void testFakeSourceToJdbcSink() throws SQLException, IOException, Interru
}
@After
- public void closeMysqlContainer() {
- if (mysql != null) {
- mysql.stop();
+ public void closePostgreSqlContainer() {
+ if (psl != null) {
+ psl.stop();
}
}
}
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf b/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf
index 598efd7b8b2..8aae4fe31c5 100644
--- a/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf
+++ b/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf
@@ -43,10 +43,10 @@ transform {
sink {
jdbc {
- driver = com.mysql.cj.jdbc.Driver
+ driver = org.postgresql.Driver
saveMode = "update",
truncate = "true",
- url = "jdbc:mysql://jdbc:3306/test",
+ url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF",
user = "test",
password = "test",
dbTable = "fake",
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf b/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf
index 1ad856e0135..d9555f20be5 100644
--- a/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf
+++ b/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf
@@ -29,8 +29,8 @@ env {
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
jdbc {
- driver = com.mysql.cj.jdbc.Driver
- url = "jdbc:mysql://jdbc:3306/test"
+ driver = org.postgresql.Driver
+ url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF",
table = "test"
result_table_name = "test_log"
user = "test"
From 27467bd7012719b05bb8a22b2976f380de6a667d Mon Sep 17 00:00:00 2001
From: zhangyuge1 <1728375982@qq.com>
Date: Tue, 26 Jul 2022 19:29:27 +0800
Subject: [PATCH 03/17] add pdjdbc license
---
LICENSE | 5 +++-
seatunnel-dist/release-docs/LICENSE | 1 +
.../release-docs/licenses/LICENSE-pgjdbc.txt | 23 +++++++++++++++++++
tools/dependencies/known-dependencies.txt | 3 ++-
4 files changed, 30 insertions(+), 2 deletions(-)
create mode 100644 seatunnel-dist/release-docs/licenses/LICENSE-pgjdbc.txt
diff --git a/LICENSE b/LICENSE
index ca1720aba68..0592259a9b6 100644
--- a/LICENSE
+++ b/LICENSE
@@ -215,4 +215,7 @@ seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade
seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/Path.java from https://github.com/lightbend/config
seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/PathParser.java from https://github.com/lightbend/config
seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/ConfigParseOptions.java from https://github.com/lightbend/config
-seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/SimpleConfigObject.java from https://github.com/lightbend/config
\ No newline at end of file
+seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/SimpleConfigObject.java from https://github.com/lightbend/config
+
+This product bundles PostgreSQL JDBC 4.2 Driver, which is available under a
+"BSD 2-Clause Simplified" license. For details, see https://jdbc.postgresql.org/.
\ No newline at end of file
diff --git a/seatunnel-dist/release-docs/LICENSE b/seatunnel-dist/release-docs/LICENSE
index 8191c55a083..93bd61564e3 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -1044,6 +1044,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
(The BSD License) xmlenc Library (xmlenc:xmlenc:0.52 - http://xmlenc.sourceforge.net)
(The BSD License) ASM Core (asm:asm:3.1 - http://asm.objectweb.org/asm/)
(New BSD License) Janino (org.codehaus.janino:janino:3.1.6 - http://docs.codehaus.org/display/JANINO/Home/janino)
+ (BSD 2-Clause License) pgjdbc (org.postgresql:postgresql:42.3.3 - https://jdbc.postgresql.org)
========================================================================
diff --git a/seatunnel-dist/release-docs/licenses/LICENSE-pgjdbc.txt b/seatunnel-dist/release-docs/licenses/LICENSE-pgjdbc.txt
new file mode 100644
index 00000000000..98dff7b6ee1
--- /dev/null
+++ b/seatunnel-dist/release-docs/licenses/LICENSE-pgjdbc.txt
@@ -0,0 +1,23 @@
+Copyright (c) 1997, PostgreSQL Global Development Group
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+1. Redistributions of source code must retain the above copyright notice,
+ this list of conditions and the following disclaimer.
+2. Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+POSSIBILITY OF SUCH DAMAGE.
\ No newline at end of file
diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt
index f2693cd704a..2eaab117502 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -732,4 +732,5 @@ zookeeper-jute-3.5.9.jar
zstd-jni-1.3.3-1.jar
zstd-jni-1.4.3-1.jar
jakarta.activation-api-1.2.1.jar
-jakarta.xml.bind-api-2.3.2.jar
\ No newline at end of file
+jakarta.xml.bind-api-2.3.2.jar
+postgresql-42.3.3.jar
\ No newline at end of file
From c9e6d3b9610cd2c07b71fdfb8d8772ee806dfaaa Mon Sep 17 00:00:00 2001
From: zhangyuge1 <1728375982@qq.com>
Date: Tue, 26 Jul 2022 19:35:19 +0800
Subject: [PATCH 04/17] add pgjdbc license
---
LICENSE | 5 +----
1 file changed, 1 insertion(+), 4 deletions(-)
diff --git a/LICENSE b/LICENSE
index 0592259a9b6..ca1720aba68 100644
--- a/LICENSE
+++ b/LICENSE
@@ -215,7 +215,4 @@ seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade
seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/Path.java from https://github.com/lightbend/config
seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/PathParser.java from https://github.com/lightbend/config
seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/ConfigParseOptions.java from https://github.com/lightbend/config
-seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/SimpleConfigObject.java from https://github.com/lightbend/config
-
-This product bundles PostgreSQL JDBC 4.2 Driver, which is available under a
-"BSD 2-Clause Simplified" license. For details, see https://jdbc.postgresql.org/.
\ No newline at end of file
+seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/SimpleConfigObject.java from https://github.com/lightbend/config
\ No newline at end of file
From ee90a05166132bbeddd5cb8db6b9d3f70285ee9a Mon Sep 17 00:00:00 2001
From: zhangyuge1 <1728375982@qq.com>
Date: Tue, 26 Jul 2022 20:02:22 +0800
Subject: [PATCH 05/17] fix vcs.xml
---
.idea/vcs.xml | 22 +++++++++++++++++++---
1 file changed, 19 insertions(+), 3 deletions(-)
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
index e5899791808..bee7eaa5380 100644
--- a/.idea/vcs.xml
+++ b/.idea/vcs.xml
@@ -1,5 +1,24 @@
+
+
+
+
@@ -10,7 +29,4 @@
-
-
-
\ No newline at end of file
From de8607affd5169f3d5c7caf7fa7e57d93facc7cb Mon Sep 17 00:00:00 2001
From: zhangyuge1 <1728375982@qq.com>
Date: Wed, 27 Jul 2022 15:47:13 +0800
Subject: [PATCH 06/17] add checker-qual-3.5.0 license
---
seatunnel-dist/release-docs/LICENSE | 1 +
tools/dependencies/known-dependencies.txt | 3 ++-
2 files changed, 3 insertions(+), 1 deletion(-)
diff --git a/seatunnel-dist/release-docs/LICENSE b/seatunnel-dist/release-docs/LICENSE
index 93bd61564e3..4dd33f1732c 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -973,6 +973,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
(The MIT License (MIT)) influxdb java bindings (org.influxdb:influxdb-java:2.22 - http://www.influxdb.org)
(The MIT License) Checker Qual (org.checkerframework:checker-qual:3.10.0 - https://checkerframework.org)
(The MIT License) Checker Qual (org.checkerframework:checker-qual:3.4.0 - https://checkerframework.org)
+ (The MIT License) Checker Qual (org.checkerframework:checker-qual:3.5.0 - https://checkerframework.org)
(The MIT License) JOpt Simple (net.sf.jopt-simple:jopt-simple:5.0.2 - http://pholser.github.io/jopt-simple)
diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt
index 2eaab117502..71389f04969 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -733,4 +733,5 @@ zstd-jni-1.3.3-1.jar
zstd-jni-1.4.3-1.jar
jakarta.activation-api-1.2.1.jar
jakarta.xml.bind-api-2.3.2.jar
-postgresql-42.3.3.jar
\ No newline at end of file
+postgresql-42.3.3.jar
+checker-qual-3.5.0.jar
\ No newline at end of file
From 0cb616e0862c207d5cb8f39bac19892742727b96 Mon Sep 17 00:00:00 2001
From: zhangyuge1 <1728375982@qq.com>
Date: Sat, 30 Jul 2022 21:19:25 +0800
Subject: [PATCH 07/17] fix dependency issue
---
.idea/vcs.xml | 6 +--
pom.xml | 8 +++-
.../connector-jdbc/pom.xml | 1 -
.../seatunnel-connector-spark-jdbc/pom.xml | 4 ++
seatunnel-core/seatunnel-core-spark/pom.xml | 6 ---
.../seatunnel-flink-starter/pom.xml | 12 -----
seatunnel-dist/release-docs/LICENSE | 2 -
.../e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java | 34 +++++++-------
.../flink/v2/jdbc/JdbcSourceToConsoleIT.java | 40 +++++++++--------
.../e2e/spark/jdbc/FakeSourceToJdbcIT.java | 33 +++++++-------
.../e2e/spark/jdbc/JdbcSourceToConsoleIT.java | 44 ++++++++++---------
tools/dependencies/known-dependencies.txt | 4 +-
12 files changed, 97 insertions(+), 97 deletions(-)
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
index bee7eaa5380..59c36d8ab14 100644
--- a/.idea/vcs.xml
+++ b/.idea/vcs.xml
@@ -19,14 +19,14 @@
-
+
-
+
-
\ No newline at end of file
+
diff --git a/pom.xml b/pom.xml
index 6f4eac23426..8157089ba66 100644
--- a/pom.xml
+++ b/pom.xml
@@ -219,6 +219,7 @@
2.6.11.5.106.2.2.Final
+ 3.10.0
@@ -333,7 +334,6 @@
org.postgresqlpostgresql${postgresql.version}
- test
@@ -922,6 +922,12 @@
hibernate-validator${hibernate.validator.version}
+
+
+ org.checkerframework
+ checker-qual
+ ${checker.qual.version}
+
diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml b/seatunnel-connectors-v2/connector-jdbc/pom.xml
index 39839eba700..1bb972ac96c 100644
--- a/seatunnel-connectors-v2/connector-jdbc/pom.xml
+++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml
@@ -46,7 +46,6 @@
org.postgresqlpostgresql
- provided
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/pom.xml b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/pom.xml
index 5bcc57ac4b0..485381fb8d4 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/pom.xml
@@ -56,6 +56,10 @@
spark-streaming_${scala.binary.version}
+
+ org.postgresql
+ postgresql
+
\ No newline at end of file
diff --git a/seatunnel-core/seatunnel-core-spark/pom.xml b/seatunnel-core/seatunnel-core-spark/pom.xml
index e11948b5116..5c1264b1850 100644
--- a/seatunnel-core/seatunnel-core-spark/pom.xml
+++ b/seatunnel-core/seatunnel-core-spark/pom.xml
@@ -118,12 +118,6 @@
seatunnel-transform-spark-null-rate${project.version}
-
-
- org.postgresql
- postgresql
- compile
-
diff --git a/seatunnel-core/seatunnel-flink-starter/pom.xml b/seatunnel-core/seatunnel-flink-starter/pom.xml
index df48449e987..f42676c72d2 100644
--- a/seatunnel-core/seatunnel-flink-starter/pom.xml
+++ b/seatunnel-core/seatunnel-flink-starter/pom.xml
@@ -102,18 +102,6 @@
junitjunit
-
-
- org.apache.seatunnel
- connector-jdbc
- ${project.version}
-
-
-
- org.postgresql
- postgresql
- compile
-
diff --git a/seatunnel-dist/release-docs/LICENSE b/seatunnel-dist/release-docs/LICENSE
index 4dd33f1732c..03ed623bf9b 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -972,8 +972,6 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
(MIT-License) spoiwo (com.norbitltd:spoiwo_2.11:1.8.0 - https://github.com/norbert-radyk/spoiwo/)
(The MIT License (MIT)) influxdb java bindings (org.influxdb:influxdb-java:2.22 - http://www.influxdb.org)
(The MIT License) Checker Qual (org.checkerframework:checker-qual:3.10.0 - https://checkerframework.org)
- (The MIT License) Checker Qual (org.checkerframework:checker-qual:3.4.0 - https://checkerframework.org)
- (The MIT License) Checker Qual (org.checkerframework:checker-qual:3.5.0 - https://checkerframework.org)
(The MIT License) JOpt Simple (net.sf.jopt-simple:jopt-simple:5.0.2 - http://pholser.github.io/jopt-simple)
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java
index dc928acc31e..dca1e5f7d95 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java
@@ -44,7 +44,6 @@
public class FakeSourceToJdbcIT extends FlinkContainer {
private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceToJdbcIT.class);
private PostgreSQLContainer> psl;
- private Connection connection;
@SuppressWarnings("checkstyle:MagicNumber")
@Before
@@ -57,17 +56,19 @@ public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoun
LOGGER.info("PostgreSql container started");
Thread.sleep(5000L);
Class.forName(psl.getDriverClassName());
- connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword());
initializeJdbcTable();
}
- private void initializeJdbcTable() throws SQLException {
- Statement statement = connection.createStatement();
- String sql = "CREATE TABLE test (\n" +
- " name varchar(255) NOT NULL\n" +
- ")";
- statement.execute(sql);
- statement.close();
+ private void initializeJdbcTable() {
+ try (Connection connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword())) {
+ Statement statement = connection.createStatement();
+ String sql = "CREATE TABLE test (\n" +
+ " name varchar(255) NOT NULL\n" +
+ ")";
+ statement.execute(sql);
+ } catch (SQLException e) {
+ throw new RuntimeException("Initializing PostgreSql table failed!", e);
+ }
}
@Test
@@ -76,14 +77,15 @@ public void testFakeSourceToJdbcSink() throws SQLException, IOException, Interru
Assert.assertEquals(0, execResult.getExitCode());
// query result
String sql = "select * from test";
- Statement statement = connection.createStatement();
- ResultSet resultSet = statement.executeQuery(sql);
- List result = Lists.newArrayList();
- while (resultSet.next()) {
- result.add(resultSet.getString("name"));
+ try (Connection connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword())) {
+ Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery(sql);
+ List result = Lists.newArrayList();
+ while (resultSet.next()) {
+ result.add(resultSet.getString("name"));
+ }
+ Assert.assertFalse(result.isEmpty());
}
- Assert.assertFalse(result.isEmpty());
- connection.close();
}
@After
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java
index 9151f001f52..195922aa033 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java
@@ -42,7 +42,6 @@
public class JdbcSourceToConsoleIT extends FlinkContainer {
private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSourceToConsoleIT.class);
private PostgreSQLContainer> psl;
- private Connection connection;
@SuppressWarnings("checkstyle:MagicNumber")
@Before
@@ -55,32 +54,37 @@ public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoun
LOGGER.info("PostgreSql container started");
Thread.sleep(5000L);
Class.forName(psl.getDriverClassName());
- connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword());
initializeJdbcTable();
batchInsertData();
}
- private void initializeJdbcTable() throws SQLException {
- Statement statement = connection.createStatement();
- String sql = "CREATE TABLE test (\n" +
- " name varchar(255) NOT NULL\n" +
- ")";
- statement.execute(sql);
- statement.close();
+ private void initializeJdbcTable() {
+ try (Connection connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword())) {
+ Statement statement = connection.createStatement();
+ String sql = "CREATE TABLE test (\n" +
+ " name varchar(255) NOT NULL\n" +
+ ")";
+ statement.execute(sql);
+ } catch (SQLException e) {
+ throw new RuntimeException("Initializing PostgreSql table failed!", e);
+ }
}
@SuppressWarnings("checkstyle:MagicNumber")
private void batchInsertData() throws SQLException {
- String sql = "insert into test(name) values(?)";
- connection.setAutoCommit(false);
- PreparedStatement preparedStatement = connection.prepareStatement(sql);
- for (int i = 0; i < 10; i++) {
- preparedStatement.setString(1, "Mike");
- preparedStatement.addBatch();
+ try (Connection connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword())) {
+ String sql = "insert into test(name) values(?)";
+ connection.setAutoCommit(false);
+ PreparedStatement preparedStatement = connection.prepareStatement(sql);
+ for (int i = 0; i < 10; i++) {
+ preparedStatement.setString(1, "Mike");
+ preparedStatement.addBatch();
+ }
+ preparedStatement.executeBatch();
+ connection.commit();
+ } catch (SQLException e) {
+ throw new RuntimeException("Batch insert data failed!", e);
}
- preparedStatement.executeBatch();
- connection.commit();
- connection.close();
}
@Test
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java b/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java
index e37c9414f49..2ab5dac3596 100644
--- a/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java
+++ b/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java
@@ -44,7 +44,6 @@
public class FakeSourceToJdbcIT extends SparkContainer {
private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceToJdbcIT.class);
private PostgreSQLContainer> psl;
- private Connection connection;
@SuppressWarnings("checkstyle:MagicNumber")
@Before
@@ -57,17 +56,19 @@ public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoun
LOGGER.info("PostgreSql container started");
Thread.sleep(5000L);
Class.forName(psl.getDriverClassName());
- connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword());
initializeJdbcTable();
}
- private void initializeJdbcTable() throws SQLException {
- Statement statement = connection.createStatement();
- String sql = "CREATE TABLE test (\n" +
- " name varchar(255) NOT NULL\n" +
- ")";
- statement.executeUpdate(sql);
- statement.close();
+ private void initializeJdbcTable() {
+ try (Connection connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword())) {
+ Statement statement = connection.createStatement();
+ String sql = "CREATE TABLE test (\n" +
+ " name varchar(255) NOT NULL\n" +
+ ")";
+ statement.execute(sql);
+ } catch (SQLException e) {
+ throw new RuntimeException("Initializing PostgreSql table failed!", e);
+ }
}
@Test
@@ -75,13 +76,15 @@ public void testFakeSourceToJdbcSink() throws SQLException, IOException, Interru
Container.ExecResult execResult = executeSeaTunnelSparkJob("/jdbc/fakesource_to_jdbc.conf");
Assert.assertEquals(0, execResult.getExitCode());
String sql = "select * from test";
- Statement statement = connection.createStatement();
- ResultSet resultSet = statement.executeQuery(sql);
- List result = Lists.newArrayList();
- while (resultSet.next()) {
- result.add(resultSet.getString("name"));
+ try (Connection connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword())) {
+ Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery(sql);
+ List result = Lists.newArrayList();
+ while (resultSet.next()) {
+ result.add(resultSet.getString("name"));
+ }
+ Assert.assertFalse(result.isEmpty());
}
- Assert.assertFalse(result.isEmpty());
}
@After
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java b/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java
index 4ed014e5223..aacea6d64d3 100644
--- a/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java
+++ b/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java
@@ -43,7 +43,6 @@
public class JdbcSourceToConsoleIT extends SparkContainer {
private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSourceToConsoleIT.class);
private PostgreSQLContainer> psl;
- private Connection connection;
@SuppressWarnings("checkstyle:MagicNumber")
@Before
@@ -57,34 +56,39 @@ public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoun
LOGGER.info("PostgreSql container started");
Thread.sleep(5000L);
Class.forName(psl.getDriverClassName());
- connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword());
initializeJdbcTable();
batchInsertData();
}
- private void initializeJdbcTable() throws SQLException {
- Statement statement = connection.createStatement();
- String sql = "CREATE TABLE test (\n" +
- " name varchar(255) NOT NULL,\n" +
- " age int NOT NULL\n" +
- ")";
- statement.executeUpdate(sql);
- statement.close();
+ private void initializeJdbcTable() {
+ try (Connection connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword())) {
+ Statement statement = connection.createStatement();
+ String sql = "CREATE TABLE test (\n" +
+ " name varchar(255) NOT NULL,\n" +
+ " age int NOT NULL\n" +
+ ")";
+ statement.executeUpdate(sql);
+ } catch (SQLException e) {
+ throw new RuntimeException("Initializing PostgreSql table failed!", e);
+ }
}
@SuppressWarnings("checkstyle:MagicNumber")
private void batchInsertData() throws SQLException {
- String sql = "insert into test(name,age) values(?,?)";
- connection.setAutoCommit(false);
- PreparedStatement preparedStatement = connection.prepareStatement(sql);
- for (int i = 0; i < 10; i++) {
- preparedStatement.setString(1, "Mike");
- preparedStatement.setInt(2, 20);
- preparedStatement.addBatch();
+ try (Connection connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword())) {
+ String sql = "insert into test(name,age) values(?,?)";
+ connection.setAutoCommit(false);
+ PreparedStatement preparedStatement = connection.prepareStatement(sql);
+ for (int i = 0; i < 10; i++) {
+ preparedStatement.setString(1, "Mike");
+ preparedStatement.setInt(2, 20);
+ preparedStatement.addBatch();
+ }
+ preparedStatement.executeBatch();
+ connection.commit();
+ } catch (SQLException e) {
+ throw new RuntimeException("Batch insert data failed!", e);
}
- preparedStatement.executeBatch();
- connection.commit();
- connection.close();
}
@Test
diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt
index 71389f04969..7b585184d40 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -55,7 +55,6 @@ calcite-core-1.29.0.jar
calcite-druid-1.29.0.jar
calcite-linq4j-1.29.0.jar
checker-qual-3.10.0.jar
-checker-qual-3.4.0.jar
chill-java-0.9.3.jar
chill_2.11-0.9.3.jar
classmate-1.1.0.jar
@@ -733,5 +732,4 @@ zstd-jni-1.3.3-1.jar
zstd-jni-1.4.3-1.jar
jakarta.activation-api-1.2.1.jar
jakarta.xml.bind-api-2.3.2.jar
-postgresql-42.3.3.jar
-checker-qual-3.5.0.jar
\ No newline at end of file
+postgresql-42.3.3.jar
\ No newline at end of file
From b5b918b93fd8bf3980019441e44d28b129d93ad0 Mon Sep 17 00:00:00 2001
From: zhangyuge1 <1728375982@qq.com>
Date: Sat, 30 Jul 2022 21:30:02 +0800
Subject: [PATCH 08/17] solve conflicts
---
pom.xml | 11 +++++++++++
1 file changed, 11 insertions(+)
diff --git a/pom.xml b/pom.xml
index 8157089ba66..5edb124eef8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -124,6 +124,7 @@
1.13.60.11.11.5.6
+ 1.8.22.3.91.21.9.4
@@ -214,11 +215,14 @@
4.3.01.1.8.32.6.8
+ 5.3.202.2.21.2.92.6.11.5.106.2.2.Final
+ 1.14.3
+ 1.3.23.10.0
@@ -923,6 +927,13 @@
${hibernate.validator.version}
+
+
+ org.jsoup
+ jsoup
+ ${jsoup.version}
+
+
org.checkerframeworkchecker-qual
From b43668444c27b08ce9a021b1efb2986676a6f670 Mon Sep 17 00:00:00 2001
From: zhangyuge1 <1728375982@qq.com>
Date: Sun, 24 Jul 2022 16:59:05 +0800
Subject: [PATCH 09/17] add jdbc e2e test
---
seatunnel-core/seatunnel-core-spark/pom.xml | 6 ++
.../seatunnel-flink-starter/pom.xml | 11 +++
.../seatunnel-flink-connector-v2-e2e/pom.xml | 7 ++
.../e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java | 79 +++++++++++++++++
.../flink/v2/jdbc/JdbcSourceToConsoleIT.java | 86 +++++++++++++++++++
.../resources/jdbc/fakesource_to_jdbc.conf | 62 +++++++++++++
.../resources/jdbc/jdbcsource_to_console.conf | 53 ++++++++++++
seatunnel-e2e/seatunnel-spark-e2e/pom.xml | 6 ++
.../e2e/spark/jdbc/FakeSourceToJdbcIT.java | 77 +++++++++++++++++
.../e2e/spark/jdbc/JdbcSourceToConsoleIT.java | 85 ++++++++++++++++++
.../resources/jdbc/fakesource_to_jdbc.conf | 60 +++++++++++++
.../resources/jdbc/jdbcsource_to_console.conf | 53 ++++++++++++
12 files changed, 585 insertions(+)
create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java
create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java
create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf
create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf
create mode 100644 seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java
create mode 100644 seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java
create mode 100644 seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf
create mode 100644 seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf
diff --git a/seatunnel-core/seatunnel-core-spark/pom.xml b/seatunnel-core/seatunnel-core-spark/pom.xml
index 5c1264b1850..21b4630230d 100644
--- a/seatunnel-core/seatunnel-core-spark/pom.xml
+++ b/seatunnel-core/seatunnel-core-spark/pom.xml
@@ -118,6 +118,12 @@
seatunnel-transform-spark-null-rate${project.version}
+
+
+ mysql
+ mysql-connector-java
+ compile
+
diff --git a/seatunnel-core/seatunnel-flink-starter/pom.xml b/seatunnel-core/seatunnel-flink-starter/pom.xml
index 6385b9c41f6..9253738069a 100644
--- a/seatunnel-core/seatunnel-flink-starter/pom.xml
+++ b/seatunnel-core/seatunnel-flink-starter/pom.xml
@@ -103,6 +103,17 @@
junit
+
+ mysql
+ mysql-connector-java
+ compile
+
+
+
+ org.apache.seatunnel
+ connector-jdbc
+ ${project.version}
+
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
index c494f80f886..2aeef2f22d5 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
@@ -37,6 +37,13 @@
testcontainers
+
+ org.testcontainers
+ mysql
+ 1.17.3
+ test
+
+
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java
new file mode 100644
index 00000000000..fe244fda2f3
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java
@@ -0,0 +1,79 @@
+package org.apache.seatunnel.e2e.flink.v2.jdbc;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import com.google.common.collect.Lists;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.stream.Stream;
+
+public class FakeSourceToJdbcIT extends FlinkContainer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceToJdbcIT.class);
+ private MySQLContainer> mysql;
+ private Connection connection;
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ @Before
+ public void startMysqlContainer() throws InterruptedException, ClassNotFoundException, SQLException {
+ mysql = new MySQLContainer<>(DockerImageName.parse("mysql:8.0"))
+ .withNetwork(NETWORK)
+ .withNetworkAliases("jdbc")
+ .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+ mysql.setPortBindings(Lists.newArrayList("3306:3306"));
+ Startables.deepStart(Stream.of(mysql)).join();
+ LOGGER.info("Jdbc container started");
+ Thread.sleep(5000L);
+ Class.forName(mysql.getDriverClassName());
+ connection = DriverManager.getConnection(mysql.getJdbcUrl(), mysql.getUsername(), mysql.getPassword());
+ initializeJdbcTable();
+ }
+
+ private void initializeJdbcTable() throws SQLException {
+ Statement statement = connection.createStatement();
+ String sql = "CREATE TABLE test (\n" +
+ " name varchar(255) NOT NULL,\n" +
+ " age int NOT NULL\n" +
+ ")";
+ statement.executeUpdate(sql);
+ statement.close();
+ }
+
+ @Test
+ public void testFakeSourceToJdbcSink() throws SQLException, IOException, InterruptedException {
+ Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/fakesource_to_jdbc.conf");
+ Assert.assertEquals(0, execResult.getExitCode());
+ // query result
+ String sql = "select * from test";
+ Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery(sql);
+ List result = Lists.newArrayList();
+ while (resultSet.next()) {
+ result.add(resultSet.getString("name"));
+ }
+ Assert.assertFalse(result.isEmpty());
+ }
+
+ @After
+ public void closeMysqlContainer() {
+ if (mysql != null) {
+ mysql.stop();
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java
new file mode 100644
index 00000000000..c19d76570a1
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java
@@ -0,0 +1,86 @@
+package org.apache.seatunnel.e2e.flink.v2.jdbc;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import com.google.common.collect.Lists;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.stream.Stream;
+
+public class JdbcSourceToConsoleIT extends FlinkContainer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSourceToConsoleIT.class);
+ private MySQLContainer> mysql;
+ private Connection connection;
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ @Before
+ public void startMysqlContainer() throws InterruptedException, ClassNotFoundException, SQLException {
+ mysql = new MySQLContainer<>(DockerImageName.parse("mysql:8.0"))
+ .withNetwork(NETWORK)
+ .withNetworkAliases("jdbc")
+ .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+ mysql.setPortBindings(Lists.newArrayList("3306:3306"));
+ Startables.deepStart(Stream.of(mysql)).join();
+ LOGGER.info("Jdbc container started");
+ // wait for clickhouse fully start
+ Thread.sleep(5000L);
+ Class.forName(mysql.getDriverClassName());
+ connection = DriverManager.getConnection(mysql.getJdbcUrl(), mysql.getUsername(), mysql.getPassword());
+ initializeJdbcTable();
+ batchInsertData();
+ }
+
+ private void initializeJdbcTable() throws SQLException {
+ Statement statement = connection.createStatement();
+ String sql = "CREATE TABLE test (\n" +
+ " name varchar(255) NOT NULL,\n" +
+ " age int NOT NULL\n" +
+ ")";
+ statement.executeUpdate(sql);
+ statement.close();
+ }
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ private void batchInsertData() throws SQLException {
+ String sql = "insert into test(name,age) values(?,?)";
+ connection.setAutoCommit(false);
+ PreparedStatement preparedStatement = connection.prepareStatement(sql);
+ for (int i = 0; i < 10; i++) {
+ preparedStatement.setString(1, "Mike");
+ preparedStatement.setInt(2, 20);
+ preparedStatement.addBatch();
+ }
+ preparedStatement.executeBatch();
+ connection.commit();
+ connection.close();
+ }
+
+ @Test
+ public void testFakeSourceToJdbcSink() throws SQLException, IOException, InterruptedException {
+ Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/jdbcsource_to_console.conf");
+ Assert.assertEquals(0, execResult.getExitCode());
+ }
+
+ @After
+ public void closeMysqlContainer() {
+ if (mysql != null) {
+ mysql.stop();
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf
new file mode 100644
index 00000000000..5a543045d99
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
+ FakeSource {
+ result_table_name = "fake"
+ field_name = "name,age"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+ sql {
+ sql = "select name,age from fake"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+ Jdbc {
+ source_table_name = fake
+ driver = com.mysql.cj.jdbc.Driver
+ url = "jdbc:mysql://jdbc:3306/test"
+ user = test
+ password = test
+ query = "insert into test(name,age) values(?,?)"
+ batch_size = 2
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf
new file mode 100644
index 00000000000..77acadb24b5
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
+ Jdbc {
+ driver = com.mysql.cj.jdbc.Driver
+ url = "jdbc:mysql://jdbc:3306/test"
+ user = test
+ password = test
+ query = "select * from test"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+
+ # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+ Console {}
+ # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-e2e/pom.xml
index e6a57972901..9b18f701f3e 100644
--- a/seatunnel-e2e/seatunnel-spark-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-spark-e2e/pom.xml
@@ -41,6 +41,12 @@
org.testcontainerstestcontainers
+
+ org.testcontainers
+ mysql
+ 1.17.3
+ test
+
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java b/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java
new file mode 100644
index 00000000000..83f1d85b1e8
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java
@@ -0,0 +1,77 @@
+package org.apache.seatunnel.e2e.spark.jdbc;
+
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import com.google.common.collect.Lists;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.stream.Stream;
+
+public class FakeSourceToJdbcIT extends SparkContainer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceToJdbcIT.class);
+ private MySQLContainer> mysql;
+ private Connection connection;
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ @Before
+ public void startMysqlContainer() throws InterruptedException, ClassNotFoundException, SQLException {
+ mysql = new MySQLContainer<>(DockerImageName.parse("mysql:8.0"))
+ .withNetwork(NETWORK)
+ .withNetworkAliases("jdbc")
+ .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+ mysql.setPortBindings(Lists.newArrayList("33306:3306"));
+ Startables.deepStart(Stream.of(mysql)).join();
+ LOGGER.info("Jdbc container started");
+ Thread.sleep(5000L);
+ Class.forName(mysql.getDriverClassName());
+ connection = DriverManager.getConnection(mysql.getJdbcUrl(), mysql.getUsername(), mysql.getPassword());
+ initializeJdbcTable();
+ }
+
+ private void initializeJdbcTable() throws SQLException {
+ Statement statement = connection.createStatement();
+ String sql = "CREATE TABLE test (\n" +
+ " name varchar(255) NOT NULL\n" +
+ ")";
+ statement.executeUpdate(sql);
+ statement.close();
+ }
+
+ @Test
+ public void testFakeSourceToJdbcSink() throws SQLException, IOException, InterruptedException {
+ Container.ExecResult execResult = executeSeaTunnelSparkJob("/jdbc/fakesource_to_jdbc.conf");
+ Assert.assertEquals(0, execResult.getExitCode());
+ String sql = "select * from test";
+ Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery(sql);
+ List result = Lists.newArrayList();
+ while (resultSet.next()) {
+ result.add(resultSet.getString("name"));
+ }
+ Assert.assertFalse(result.isEmpty());
+ }
+
+ @After
+ public void closeMysqlContainer() {
+ if (mysql != null) {
+ mysql.stop();
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java b/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java
new file mode 100644
index 00000000000..c2942d704b7
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java
@@ -0,0 +1,85 @@
+package org.apache.seatunnel.e2e.spark.jdbc;
+
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import com.google.common.collect.Lists;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.stream.Stream;
+
+public class JdbcSourceToConsoleIT extends SparkContainer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSourceToConsoleIT.class);
+ private MySQLContainer> mysql;
+ private Connection connection;
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ @Before
+ public void startMysqlContainer() throws InterruptedException, ClassNotFoundException, SQLException {
+ mysql = new MySQLContainer<>(DockerImageName.parse("mysql:8.0"))
+ .withNetwork(NETWORK)
+ .withNetworkAliases("jdbc")
+ .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+ mysql.setPortBindings(Lists.newArrayList("33306:3306"));
+ Startables.deepStart(Stream.of(mysql)).join();
+ LOGGER.info("Jdbc container started");
+ Thread.sleep(5000L);
+ Class.forName(mysql.getDriverClassName());
+ connection = DriverManager.getConnection(mysql.getJdbcUrl(), mysql.getUsername(), mysql.getPassword());
+ initializeJdbcTable();
+ batchInsertData();
+ }
+
+ private void initializeJdbcTable() throws SQLException {
+ Statement statement = connection.createStatement();
+ String sql = "CREATE TABLE test (\n" +
+ " name varchar(255) NOT NULL,\n" +
+ " age int NOT NULL\n" +
+ ")";
+ statement.executeUpdate(sql);
+ statement.close();
+ }
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ private void batchInsertData() throws SQLException {
+ String sql = "insert into test(name,age) values(?,?)";
+ connection.setAutoCommit(false);
+ PreparedStatement preparedStatement = connection.prepareStatement(sql);
+ for (int i = 0; i < 10; i++) {
+ preparedStatement.setString(1, "Mike");
+ preparedStatement.setInt(2, 20);
+ preparedStatement.addBatch();
+ }
+ preparedStatement.executeBatch();
+ connection.commit();
+ connection.close();
+ }
+
+ @Test
+ public void testFakeSourceToJdbcSink() throws SQLException, IOException, InterruptedException {
+ Container.ExecResult execResult = executeSeaTunnelSparkJob("/jdbc/jdbcsource_to_console.conf");
+ Assert.assertEquals(0, execResult.getExitCode());
+ }
+
+ @After
+ public void closeMysqlContainer() {
+ if (mysql != null) {
+ mysql.stop();
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf b/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf
new file mode 100644
index 00000000000..598efd7b8b2
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
+ Fake {
+ result_table_name = "fake"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+ # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+ jdbc {
+ driver = com.mysql.cj.jdbc.Driver
+ saveMode = "update",
+ truncate = "true",
+ url = "jdbc:mysql://jdbc:3306/test",
+ user = "test",
+ password = "test",
+ dbTable = "fake",
+ customUpdateStmt = "insert into test(name) values(?)"
+ jdbc.connect_timeout = 10000
+ jdbc.socket_timeout = 10000
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf b/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf
new file mode 100644
index 00000000000..1ad856e0135
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
+ jdbc {
+ driver = com.mysql.cj.jdbc.Driver
+ url = "jdbc:mysql://jdbc:3306/test"
+ table = "test"
+ result_table_name = "test_log"
+ user = "test"
+ password = "test"
+ }
+ # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+ # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+ Console {}
+
+ # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+}
\ No newline at end of file
From cdffad0e3ab206dd432c282a1366e58b863d0582 Mon Sep 17 00:00:00 2001
From: zhangyuge1 <1728375982@qq.com>
Date: Mon, 25 Jul 2022 22:37:48 +0800
Subject: [PATCH 10/17] add jdbc(postgresql) e2e test
---
.idea/vcs.xml | 28 +++-------
seatunnel-core/seatunnel-core-spark/pom.xml | 4 +-
.../seatunnel-flink-starter/pom.xml | 12 ++---
.../seatunnel-flink-connector-v2-e2e/pom.xml | 8 ++-
.../e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java | 48 +++++++++++------
.../flink/v2/jdbc/JdbcSourceToConsoleIT.java | 52 ++++++++++++-------
.../resources/jdbc/fakesource_to_jdbc.conf | 11 ++--
.../resources/jdbc/jdbcsource_to_console.conf | 6 +--
seatunnel-e2e/seatunnel-spark-e2e/pom.xml | 7 ++-
.../e2e/spark/jdbc/FakeSourceToJdbcIT.java | 42 ++++++++++-----
.../e2e/spark/jdbc/JdbcSourceToConsoleIT.java | 43 ++++++++++-----
.../resources/jdbc/fakesource_to_jdbc.conf | 4 +-
.../resources/jdbc/jdbcsource_to_console.conf | 4 +-
13 files changed, 162 insertions(+), 107 deletions(-)
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
index 59c36d8ab14..e5899791808 100644
--- a/.idea/vcs.xml
+++ b/.idea/vcs.xml
@@ -1,32 +1,16 @@
-
-
-
-
-
+
-
+
-
+
+
+
+
\ No newline at end of file
diff --git a/seatunnel-core/seatunnel-core-spark/pom.xml b/seatunnel-core/seatunnel-core-spark/pom.xml
index 21b4630230d..e11948b5116 100644
--- a/seatunnel-core/seatunnel-core-spark/pom.xml
+++ b/seatunnel-core/seatunnel-core-spark/pom.xml
@@ -120,8 +120,8 @@
- mysql
- mysql-connector-java
+ org.postgresql
+ postgresqlcompile
diff --git a/seatunnel-core/seatunnel-flink-starter/pom.xml b/seatunnel-core/seatunnel-flink-starter/pom.xml
index 9253738069a..df48449e987 100644
--- a/seatunnel-core/seatunnel-flink-starter/pom.xml
+++ b/seatunnel-core/seatunnel-flink-starter/pom.xml
@@ -103,17 +103,17 @@
junit
-
- mysql
- mysql-connector-java
- compile
-
-
org.apache.seatunnelconnector-jdbc${project.version}
+
+
+ org.postgresql
+ postgresql
+ compile
+
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
index 2aeef2f22d5..454a41c9562 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
@@ -39,11 +39,17 @@
org.testcontainers
- mysql
+ postgresql1.17.3test
+
+ org.postgresql
+ postgresql
+ test
+
+
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java
index fe244fda2f3..dc928acc31e 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.seatunnel.e2e.flink.v2.jdbc;
import org.apache.seatunnel.e2e.flink.FlinkContainer;
@@ -10,7 +27,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
-import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;
@@ -26,32 +43,30 @@
public class FakeSourceToJdbcIT extends FlinkContainer {
private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceToJdbcIT.class);
- private MySQLContainer> mysql;
+ private PostgreSQLContainer> psl;
private Connection connection;
@SuppressWarnings("checkstyle:MagicNumber")
@Before
- public void startMysqlContainer() throws InterruptedException, ClassNotFoundException, SQLException {
- mysql = new MySQLContainer<>(DockerImageName.parse("mysql:8.0"))
+ public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoundException, SQLException {
+ psl = new PostgreSQLContainer<>(DockerImageName.parse("postgres:alpine3.16"))
.withNetwork(NETWORK)
- .withNetworkAliases("jdbc")
+ .withNetworkAliases("postgresql")
.withLogConsumer(new Slf4jLogConsumer(LOGGER));
- mysql.setPortBindings(Lists.newArrayList("3306:3306"));
- Startables.deepStart(Stream.of(mysql)).join();
- LOGGER.info("Jdbc container started");
+ Startables.deepStart(Stream.of(psl)).join();
+ LOGGER.info("PostgreSql container started");
Thread.sleep(5000L);
- Class.forName(mysql.getDriverClassName());
- connection = DriverManager.getConnection(mysql.getJdbcUrl(), mysql.getUsername(), mysql.getPassword());
+ Class.forName(psl.getDriverClassName());
+ connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword());
initializeJdbcTable();
}
private void initializeJdbcTable() throws SQLException {
Statement statement = connection.createStatement();
String sql = "CREATE TABLE test (\n" +
- " name varchar(255) NOT NULL,\n" +
- " age int NOT NULL\n" +
+ " name varchar(255) NOT NULL\n" +
")";
- statement.executeUpdate(sql);
+ statement.execute(sql);
statement.close();
}
@@ -68,12 +83,13 @@ public void testFakeSourceToJdbcSink() throws SQLException, IOException, Interru
result.add(resultSet.getString("name"));
}
Assert.assertFalse(result.isEmpty());
+ connection.close();
}
@After
- public void closeMysqlContainer() {
- if (mysql != null) {
- mysql.stop();
+ public void closeClickHouseContainer() {
+ if (psl != null) {
+ psl.stop();
}
}
}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java
index c19d76570a1..9151f001f52 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java
@@ -1,8 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.seatunnel.e2e.flink.v2.jdbc;
import org.apache.seatunnel.e2e.flink.FlinkContainer;
-import com.google.common.collect.Lists;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -10,7 +26,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
-import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;
@@ -25,23 +41,21 @@
public class JdbcSourceToConsoleIT extends FlinkContainer {
private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSourceToConsoleIT.class);
- private MySQLContainer> mysql;
+ private PostgreSQLContainer> psl;
private Connection connection;
@SuppressWarnings("checkstyle:MagicNumber")
@Before
- public void startMysqlContainer() throws InterruptedException, ClassNotFoundException, SQLException {
- mysql = new MySQLContainer<>(DockerImageName.parse("mysql:8.0"))
+ public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoundException, SQLException {
+ psl = new PostgreSQLContainer<>(DockerImageName.parse("postgres:alpine3.16"))
.withNetwork(NETWORK)
- .withNetworkAliases("jdbc")
+ .withNetworkAliases("postgresql")
.withLogConsumer(new Slf4jLogConsumer(LOGGER));
- mysql.setPortBindings(Lists.newArrayList("3306:3306"));
- Startables.deepStart(Stream.of(mysql)).join();
- LOGGER.info("Jdbc container started");
- // wait for clickhouse fully start
+ Startables.deepStart(Stream.of(psl)).join();
+ LOGGER.info("PostgreSql container started");
Thread.sleep(5000L);
- Class.forName(mysql.getDriverClassName());
- connection = DriverManager.getConnection(mysql.getJdbcUrl(), mysql.getUsername(), mysql.getPassword());
+ Class.forName(psl.getDriverClassName());
+ connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword());
initializeJdbcTable();
batchInsertData();
}
@@ -49,21 +63,19 @@ public void startMysqlContainer() throws InterruptedException, ClassNotFoundExce
private void initializeJdbcTable() throws SQLException {
Statement statement = connection.createStatement();
String sql = "CREATE TABLE test (\n" +
- " name varchar(255) NOT NULL,\n" +
- " age int NOT NULL\n" +
+ " name varchar(255) NOT NULL\n" +
")";
- statement.executeUpdate(sql);
+ statement.execute(sql);
statement.close();
}
@SuppressWarnings("checkstyle:MagicNumber")
private void batchInsertData() throws SQLException {
- String sql = "insert into test(name,age) values(?,?)";
+ String sql = "insert into test(name) values(?)";
connection.setAutoCommit(false);
PreparedStatement preparedStatement = connection.prepareStatement(sql);
for (int i = 0; i < 10; i++) {
preparedStatement.setString(1, "Mike");
- preparedStatement.setInt(2, 20);
preparedStatement.addBatch();
}
preparedStatement.executeBatch();
@@ -78,9 +90,9 @@ public void testFakeSourceToJdbcSink() throws SQLException, IOException, Interru
}
@After
- public void closeMysqlContainer() {
- if (mysql != null) {
- mysql.stop();
+ public void closePostgreSqlContainer() {
+ if (psl != null) {
+ psl.stop();
}
}
}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf
index 5a543045d99..9640e19c228 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf
@@ -30,7 +30,7 @@ source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
result_table_name = "fake"
- field_name = "name,age"
+ field_name = "name"
}
# If you would like to get more information about how to configure seatunnel and see full list of source plugins,
@@ -39,7 +39,7 @@ source {
transform {
sql {
- sql = "select name,age from fake"
+ sql = "select name from fake"
}
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
@@ -49,12 +49,11 @@ transform {
sink {
Jdbc {
source_table_name = fake
- driver = com.mysql.cj.jdbc.Driver
- url = "jdbc:mysql://jdbc:3306/test"
+ driver = org.postgresql.Driver
+ url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
user = test
password = test
- query = "insert into test(name,age) values(?,?)"
- batch_size = 2
+ query = "insert into test(name) values(?)"
}
# If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf
index 77acadb24b5..6862abc04c2 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf
@@ -29,10 +29,10 @@ env {
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
Jdbc {
- driver = com.mysql.cj.jdbc.Driver
- url = "jdbc:mysql://jdbc:3306/test"
+ driver = org.postgresql.Driver
+ url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
user = test
- password = test
+ password = "test"
query = "select * from test"
}
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-e2e/pom.xml
index 9b18f701f3e..85a51616522 100644
--- a/seatunnel-e2e/seatunnel-spark-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-spark-e2e/pom.xml
@@ -43,10 +43,15 @@
org.testcontainers
- mysql
+ postgresql1.17.3test
+
+ org.postgresql
+ postgresql
+ test
+
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java b/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java
index 83f1d85b1e8..e37c9414f49 100644
--- a/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java
+++ b/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.seatunnel.e2e.spark.jdbc;
import org.apache.seatunnel.e2e.spark.SparkContainer;
@@ -10,7 +27,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
-import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;
@@ -26,22 +43,21 @@
public class FakeSourceToJdbcIT extends SparkContainer {
private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceToJdbcIT.class);
- private MySQLContainer> mysql;
+ private PostgreSQLContainer> psl;
private Connection connection;
@SuppressWarnings("checkstyle:MagicNumber")
@Before
- public void startMysqlContainer() throws InterruptedException, ClassNotFoundException, SQLException {
- mysql = new MySQLContainer<>(DockerImageName.parse("mysql:8.0"))
+ public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoundException, SQLException {
+ psl = new PostgreSQLContainer<>(DockerImageName.parse("postgres:alpine3.16"))
.withNetwork(NETWORK)
- .withNetworkAliases("jdbc")
+ .withNetworkAliases("postgresql")
.withLogConsumer(new Slf4jLogConsumer(LOGGER));
- mysql.setPortBindings(Lists.newArrayList("33306:3306"));
- Startables.deepStart(Stream.of(mysql)).join();
- LOGGER.info("Jdbc container started");
+ Startables.deepStart(Stream.of(psl)).join();
+ LOGGER.info("PostgreSql container started");
Thread.sleep(5000L);
- Class.forName(mysql.getDriverClassName());
- connection = DriverManager.getConnection(mysql.getJdbcUrl(), mysql.getUsername(), mysql.getPassword());
+ Class.forName(psl.getDriverClassName());
+ connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword());
initializeJdbcTable();
}
@@ -69,9 +85,9 @@ public void testFakeSourceToJdbcSink() throws SQLException, IOException, Interru
}
@After
- public void closeMysqlContainer() {
- if (mysql != null) {
- mysql.stop();
+ public void closePostgreSqlContainer() {
+ if (psl != null) {
+ psl.stop();
}
}
}
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java b/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java
index c2942d704b7..4ed014e5223 100644
--- a/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java
+++ b/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.seatunnel.e2e.spark.jdbc;
import org.apache.seatunnel.e2e.spark.SparkContainer;
@@ -10,7 +27,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
-import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;
@@ -25,22 +42,22 @@
public class JdbcSourceToConsoleIT extends SparkContainer {
private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSourceToConsoleIT.class);
- private MySQLContainer> mysql;
+ private PostgreSQLContainer> psl;
private Connection connection;
@SuppressWarnings("checkstyle:MagicNumber")
@Before
- public void startMysqlContainer() throws InterruptedException, ClassNotFoundException, SQLException {
- mysql = new MySQLContainer<>(DockerImageName.parse("mysql:8.0"))
+ public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoundException, SQLException {
+ psl = new PostgreSQLContainer<>(DockerImageName.parse("postgres:alpine3.16"))
.withNetwork(NETWORK)
- .withNetworkAliases("jdbc")
+ .withNetworkAliases("postgresql")
.withLogConsumer(new Slf4jLogConsumer(LOGGER));
- mysql.setPortBindings(Lists.newArrayList("33306:3306"));
- Startables.deepStart(Stream.of(mysql)).join();
- LOGGER.info("Jdbc container started");
+ psl.setPortBindings(Lists.newArrayList("33306:3306"));
+ Startables.deepStart(Stream.of(psl)).join();
+ LOGGER.info("PostgreSql container started");
Thread.sleep(5000L);
- Class.forName(mysql.getDriverClassName());
- connection = DriverManager.getConnection(mysql.getJdbcUrl(), mysql.getUsername(), mysql.getPassword());
+ Class.forName(psl.getDriverClassName());
+ connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword());
initializeJdbcTable();
batchInsertData();
}
@@ -77,9 +94,9 @@ public void testFakeSourceToJdbcSink() throws SQLException, IOException, Interru
}
@After
- public void closeMysqlContainer() {
- if (mysql != null) {
- mysql.stop();
+ public void closePostgreSqlContainer() {
+ if (psl != null) {
+ psl.stop();
}
}
}
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf b/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf
index 598efd7b8b2..8aae4fe31c5 100644
--- a/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf
+++ b/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf
@@ -43,10 +43,10 @@ transform {
sink {
jdbc {
- driver = com.mysql.cj.jdbc.Driver
+ driver = org.postgresql.Driver
saveMode = "update",
truncate = "true",
- url = "jdbc:mysql://jdbc:3306/test",
+ url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF",
user = "test",
password = "test",
dbTable = "fake",
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf b/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf
index 1ad856e0135..d9555f20be5 100644
--- a/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf
+++ b/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf
@@ -29,8 +29,8 @@ env {
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
jdbc {
- driver = com.mysql.cj.jdbc.Driver
- url = "jdbc:mysql://jdbc:3306/test"
+ driver = org.postgresql.Driver
+ url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF",
table = "test"
result_table_name = "test_log"
user = "test"
From f4475f668cd2ac64eba9d688ef0d11d44315d1de Mon Sep 17 00:00:00 2001
From: zhangyuge1 <1728375982@qq.com>
Date: Tue, 26 Jul 2022 19:29:27 +0800
Subject: [PATCH 11/17] add pdjdbc license
---
LICENSE | 5 +++-
seatunnel-dist/release-docs/LICENSE | 1 +
.../release-docs/licenses/LICENSE-pgjdbc.txt | 23 +++++++++++++++++++
tools/dependencies/known-dependencies.txt | 3 ++-
4 files changed, 30 insertions(+), 2 deletions(-)
create mode 100644 seatunnel-dist/release-docs/licenses/LICENSE-pgjdbc.txt
diff --git a/LICENSE b/LICENSE
index ca1720aba68..0592259a9b6 100644
--- a/LICENSE
+++ b/LICENSE
@@ -215,4 +215,7 @@ seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade
seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/Path.java from https://github.com/lightbend/config
seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/PathParser.java from https://github.com/lightbend/config
seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/ConfigParseOptions.java from https://github.com/lightbend/config
-seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/SimpleConfigObject.java from https://github.com/lightbend/config
\ No newline at end of file
+seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/SimpleConfigObject.java from https://github.com/lightbend/config
+
+This product bundles PostgreSQL JDBC 4.2 Driver, which is available under a
+"BSD 2-Clause Simplified" license. For details, see https://jdbc.postgresql.org/.
\ No newline at end of file
diff --git a/seatunnel-dist/release-docs/LICENSE b/seatunnel-dist/release-docs/LICENSE
index 8e3f37ff1c0..dccdd224dde 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -1045,6 +1045,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
(The BSD License) xmlenc Library (xmlenc:xmlenc:0.52 - http://xmlenc.sourceforge.net)
(The BSD License) ASM Core (asm:asm:3.1 - http://asm.objectweb.org/asm/)
(New BSD License) Janino (org.codehaus.janino:janino:3.1.6 - http://docs.codehaus.org/display/JANINO/Home/janino)
+ (BSD 2-Clause License) pgjdbc (org.postgresql:postgresql:42.3.3 - https://jdbc.postgresql.org)
========================================================================
diff --git a/seatunnel-dist/release-docs/licenses/LICENSE-pgjdbc.txt b/seatunnel-dist/release-docs/licenses/LICENSE-pgjdbc.txt
new file mode 100644
index 00000000000..98dff7b6ee1
--- /dev/null
+++ b/seatunnel-dist/release-docs/licenses/LICENSE-pgjdbc.txt
@@ -0,0 +1,23 @@
+Copyright (c) 1997, PostgreSQL Global Development Group
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+1. Redistributions of source code must retain the above copyright notice,
+ this list of conditions and the following disclaimer.
+2. Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+POSSIBILITY OF SUCH DAMAGE.
\ No newline at end of file
diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt
index c7b8aaca294..8de30fc22b4 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -731,4 +731,5 @@ zookeeper-jute-3.5.9.jar
zstd-jni-1.3.3-1.jar
zstd-jni-1.4.3-1.jar
jakarta.activation-api-1.2.1.jar
-jakarta.xml.bind-api-2.3.2.jar
\ No newline at end of file
+jakarta.xml.bind-api-2.3.2.jar
+postgresql-42.3.3.jar
\ No newline at end of file
From d1dd054c522b16ca43001ba0461a3b114cdf6c01 Mon Sep 17 00:00:00 2001
From: zhangyuge1 <1728375982@qq.com>
Date: Tue, 26 Jul 2022 19:35:19 +0800
Subject: [PATCH 12/17] add pgjdbc license
---
LICENSE | 5 +----
1 file changed, 1 insertion(+), 4 deletions(-)
diff --git a/LICENSE b/LICENSE
index 0592259a9b6..ca1720aba68 100644
--- a/LICENSE
+++ b/LICENSE
@@ -215,7 +215,4 @@ seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade
seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/Path.java from https://github.com/lightbend/config
seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/PathParser.java from https://github.com/lightbend/config
seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/ConfigParseOptions.java from https://github.com/lightbend/config
-seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/SimpleConfigObject.java from https://github.com/lightbend/config
-
-This product bundles PostgreSQL JDBC 4.2 Driver, which is available under a
-"BSD 2-Clause Simplified" license. For details, see https://jdbc.postgresql.org/.
\ No newline at end of file
+seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/SimpleConfigObject.java from https://github.com/lightbend/config
\ No newline at end of file
From 3f003295b635739fa5cddd69297d74006ca34b2b Mon Sep 17 00:00:00 2001
From: zhangyuge1 <1728375982@qq.com>
Date: Tue, 26 Jul 2022 20:02:22 +0800
Subject: [PATCH 13/17] fix vcs.xml
---
.idea/vcs.xml | 22 +++++++++++++++++++---
1 file changed, 19 insertions(+), 3 deletions(-)
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
index e5899791808..bee7eaa5380 100644
--- a/.idea/vcs.xml
+++ b/.idea/vcs.xml
@@ -1,5 +1,24 @@
+
+
+
+
@@ -10,7 +29,4 @@
-
-
-
\ No newline at end of file
From c8de56c8dcbb0afcb0da3d10bae365833edec97c Mon Sep 17 00:00:00 2001
From: zhangyuge1 <1728375982@qq.com>
Date: Wed, 27 Jul 2022 15:47:13 +0800
Subject: [PATCH 14/17] add checker-qual-3.5.0 license
---
seatunnel-dist/release-docs/LICENSE | 1 +
tools/dependencies/known-dependencies.txt | 3 ++-
2 files changed, 3 insertions(+), 1 deletion(-)
diff --git a/seatunnel-dist/release-docs/LICENSE b/seatunnel-dist/release-docs/LICENSE
index dccdd224dde..288f1832380 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -974,6 +974,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
(The MIT License (MIT)) influxdb java bindings (org.influxdb:influxdb-java:2.22 - http://www.influxdb.org)
(The MIT License) Checker Qual (org.checkerframework:checker-qual:3.10.0 - https://checkerframework.org)
(The MIT License) Checker Qual (org.checkerframework:checker-qual:3.4.0 - https://checkerframework.org)
+ (The MIT License) Checker Qual (org.checkerframework:checker-qual:3.5.0 - https://checkerframework.org)
(The MIT License) JOpt Simple (net.sf.jopt-simple:jopt-simple:5.0.2 - http://pholser.github.io/jopt-simple)
diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt
index 8de30fc22b4..6a1800f119f 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -732,4 +732,5 @@ zstd-jni-1.3.3-1.jar
zstd-jni-1.4.3-1.jar
jakarta.activation-api-1.2.1.jar
jakarta.xml.bind-api-2.3.2.jar
-postgresql-42.3.3.jar
\ No newline at end of file
+postgresql-42.3.3.jar
+checker-qual-3.5.0.jar
\ No newline at end of file
From f8255240ecc91f3ffacdf9207ba982ab158a6c67 Mon Sep 17 00:00:00 2001
From: zhangyuge1 <1728375982@qq.com>
Date: Sat, 30 Jul 2022 21:19:25 +0800
Subject: [PATCH 15/17] fix dependency issue
---
.idea/vcs.xml | 6 +--
pom.xml | 7 ++-
.../connector-jdbc/pom.xml | 1 -
.../seatunnel-connector-spark-jdbc/pom.xml | 4 ++
seatunnel-core/seatunnel-core-spark/pom.xml | 6 ---
.../seatunnel-flink-starter/pom.xml | 12 -----
seatunnel-dist/release-docs/LICENSE | 2 -
.../e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java | 34 +++++++-------
.../flink/v2/jdbc/JdbcSourceToConsoleIT.java | 40 +++++++++--------
.../e2e/spark/jdbc/FakeSourceToJdbcIT.java | 33 +++++++-------
.../e2e/spark/jdbc/JdbcSourceToConsoleIT.java | 44 ++++++++++---------
tools/dependencies/known-dependencies.txt | 4 +-
12 files changed, 96 insertions(+), 97 deletions(-)
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
index bee7eaa5380..59c36d8ab14 100644
--- a/.idea/vcs.xml
+++ b/.idea/vcs.xml
@@ -19,14 +19,14 @@
-
+