diff --git a/pom.xml b/pom.xml index e9e5ff448ff..120c5a73db2 100644 --- a/pom.xml +++ b/pom.xml @@ -224,6 +224,7 @@ 1.14.3 1.3.2 7.5.1 + 3.10.0 @@ -349,7 +350,6 @@ org.postgresql postgresql ${postgresql.version} - test @@ -935,6 +935,11 @@ ${jsoup.version} + + org.checkerframework + checker-qual + ${checker.qual.version} + diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml b/seatunnel-connectors-v2/connector-jdbc/pom.xml index 39839eba700..1bb972ac96c 100644 --- a/seatunnel-connectors-v2/connector-jdbc/pom.xml +++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml @@ -46,7 +46,6 @@ org.postgresql postgresql - provided diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/pom.xml b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/pom.xml index 5bcc57ac4b0..485381fb8d4 100644 --- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/pom.xml +++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/pom.xml @@ -56,6 +56,10 @@ spark-streaming_${scala.binary.version} + + org.postgresql + postgresql + \ No newline at end of file diff --git a/seatunnel-dist/release-docs/LICENSE b/seatunnel-dist/release-docs/LICENSE index 34c6bcc204d..090806a09d4 100644 --- a/seatunnel-dist/release-docs/LICENSE +++ b/seatunnel-dist/release-docs/LICENSE @@ -973,7 +973,6 @@ The text of each license is also included at licenses/LICENSE-[project].txt. (MIT-License) spoiwo (com.norbitltd:spoiwo_2.11:1.8.0 - https://github.com/norbert-radyk/spoiwo/) (The MIT License (MIT)) influxdb java bindings (org.influxdb:influxdb-java:2.22 - http://www.influxdb.org) (The MIT License) Checker Qual (org.checkerframework:checker-qual:3.10.0 - https://checkerframework.org) - (The MIT License) Checker Qual (org.checkerframework:checker-qual:3.4.0 - https://checkerframework.org) (The MIT License) JOpt Simple (net.sf.jopt-simple:jopt-simple:5.0.2 - http://pholser.github.io/jopt-simple) @@ -1045,6 +1044,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt. (The BSD License) xmlenc Library (xmlenc:xmlenc:0.52 - http://xmlenc.sourceforge.net) (The BSD License) ASM Core (asm:asm:3.1 - http://asm.objectweb.org/asm/) (New BSD License) Janino (org.codehaus.janino:janino:3.1.6 - http://docs.codehaus.org/display/JANINO/Home/janino) + (BSD 2-Clause License) pgjdbc (org.postgresql:postgresql:42.3.3 - https://jdbc.postgresql.org) ======================================================================== diff --git a/seatunnel-dist/release-docs/licenses/LICENSE-pgjdbc.txt b/seatunnel-dist/release-docs/licenses/LICENSE-pgjdbc.txt new file mode 100644 index 00000000000..98dff7b6ee1 --- /dev/null +++ b/seatunnel-dist/release-docs/licenses/LICENSE-pgjdbc.txt @@ -0,0 +1,23 @@ +Copyright (c) 1997, PostgreSQL Global Development Group +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. +2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml index c494f80f886..454a41c9562 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml @@ -37,6 +37,19 @@ testcontainers + + org.testcontainers + postgresql + 1.17.3 + test + + + + org.postgresql + postgresql + test + + \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java new file mode 100644 index 00000000000..784bd9a060c --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.flink.v2.jdbc; + +import org.apache.seatunnel.e2e.flink.FlinkContainer; + +import com.google.common.collect.Lists; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; +import java.util.stream.Stream; + +public class FakeSourceToJdbcIT extends FlinkContainer { + private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceToJdbcIT.class); + private PostgreSQLContainer psl; + + @SuppressWarnings("checkstyle:MagicNumber") + @BeforeEach + public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoundException, SQLException { + psl = new PostgreSQLContainer<>(DockerImageName.parse("postgres:alpine3.16")) + .withNetwork(NETWORK) + .withNetworkAliases("postgresql") + .withLogConsumer(new Slf4jLogConsumer(LOGGER)); + Startables.deepStart(Stream.of(psl)).join(); + LOGGER.info("PostgreSql container started"); + Thread.sleep(5000L); + Class.forName(psl.getDriverClassName()); + initializeJdbcTable(); + } + + private void initializeJdbcTable() { + try (Connection connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword())) { + Statement statement = connection.createStatement(); + String sql = "CREATE TABLE test (\n" + + " name varchar(255) NOT NULL\n" + + ")"; + statement.execute(sql); + } catch (SQLException e) { + throw new RuntimeException("Initializing PostgreSql table failed!", e); + } + } + + @Test + public void testFakeSourceToJdbcSink() throws SQLException, IOException, InterruptedException { + Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/fakesource_to_jdbc.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + // query result + String sql = "select * from test"; + try (Connection connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword())) { + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(sql); + List result = Lists.newArrayList(); + while (resultSet.next()) { + result.add(resultSet.getString("name")); + } + Assertions.assertFalse(result.isEmpty()); + } + } + + @AfterEach + public void closeClickHouseContainer() { + if (psl != null) { + psl.stop(); + } + } +} diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java new file mode 100644 index 00000000000..14eba119d06 --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.flink.v2.jdbc; + +import org.apache.seatunnel.e2e.flink.FlinkContainer; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.stream.Stream; + +public class JdbcSourceToConsoleIT extends FlinkContainer { + private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSourceToConsoleIT.class); + private PostgreSQLContainer psl; + + @SuppressWarnings("checkstyle:MagicNumber") + @BeforeEach + public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoundException, SQLException { + psl = new PostgreSQLContainer<>(DockerImageName.parse("postgres:alpine3.16")) + .withNetwork(NETWORK) + .withNetworkAliases("postgresql") + .withLogConsumer(new Slf4jLogConsumer(LOGGER)); + Startables.deepStart(Stream.of(psl)).join(); + LOGGER.info("PostgreSql container started"); + Thread.sleep(5000L); + Class.forName(psl.getDriverClassName()); + initializeJdbcTable(); + batchInsertData(); + } + + private void initializeJdbcTable() { + try (Connection connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword())) { + Statement statement = connection.createStatement(); + String sql = "CREATE TABLE test (\n" + + " name varchar(255) NOT NULL\n" + + ")"; + statement.execute(sql); + } catch (SQLException e) { + throw new RuntimeException("Initializing PostgreSql table failed!", e); + } + } + + @SuppressWarnings("checkstyle:MagicNumber") + private void batchInsertData() throws SQLException { + try (Connection connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword())) { + String sql = "insert into test(name) values(?)"; + connection.setAutoCommit(false); + PreparedStatement preparedStatement = connection.prepareStatement(sql); + for (int i = 0; i < 10; i++) { + preparedStatement.setString(1, "Mike"); + preparedStatement.addBatch(); + } + preparedStatement.executeBatch(); + connection.commit(); + } catch (SQLException e) { + throw new RuntimeException("Batch insert data failed!", e); + } + } + + @Test + public void testFakeSourceToJdbcSink() throws SQLException, IOException, InterruptedException { + Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/jdbcsource_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + } + + @AfterEach + public void closePostgreSqlContainer() { + if (psl != null) { + psl.stop(); + } + } +} diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf new file mode 100644 index 00000000000..9640e19c228 --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf @@ -0,0 +1,61 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set flink configuration here + execution.parallelism = 1 + job.mode = "BATCH" + #execution.checkpoint.interval = 10000 + #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + result_table_name = "fake" + field_name = "name" + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake +} + +transform { + sql { + sql = "select name from fake" + } + + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql +} + +sink { + Jdbc { + source_table_name = fake + driver = org.postgresql.Driver + url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF" + user = test + password = test + query = "insert into test(name) values(?)" + } + + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf new file mode 100644 index 00000000000..6862abc04c2 --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf @@ -0,0 +1,53 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set flink configuration here + execution.parallelism = 1 + job.mode = "BATCH" + #execution.checkpoint.interval = 10000 + #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + Jdbc { + driver = org.postgresql.Driver + url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF" + user = test + password = "test" + query = "select * from test" + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake +} + +transform { + + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql +} + +sink { + Console {} + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-spark-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-e2e/pom.xml index e6a57972901..85a51616522 100644 --- a/seatunnel-e2e/seatunnel-spark-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-spark-e2e/pom.xml @@ -41,6 +41,17 @@ org.testcontainers testcontainers + + org.testcontainers + postgresql + 1.17.3 + test + + + org.postgresql + postgresql + test + \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java b/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java new file mode 100644 index 00000000000..061de4e1ab2 --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.spark.jdbc; + +import org.apache.seatunnel.e2e.spark.SparkContainer; + +import com.google.common.collect.Lists; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; +import java.util.stream.Stream; + +public class FakeSourceToJdbcIT extends SparkContainer { + private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceToJdbcIT.class); + private PostgreSQLContainer psl; + + @SuppressWarnings("checkstyle:MagicNumber") + @BeforeEach + public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoundException, SQLException { + psl = new PostgreSQLContainer<>(DockerImageName.parse("postgres:alpine3.16")) + .withNetwork(NETWORK) + .withNetworkAliases("postgresql") + .withLogConsumer(new Slf4jLogConsumer(LOGGER)); + Startables.deepStart(Stream.of(psl)).join(); + LOGGER.info("PostgreSql container started"); + Thread.sleep(5000L); + Class.forName(psl.getDriverClassName()); + initializeJdbcTable(); + } + + private void initializeJdbcTable() { + try (Connection connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword())) { + Statement statement = connection.createStatement(); + String sql = "CREATE TABLE test (\n" + + " name varchar(255) NOT NULL\n" + + ")"; + statement.execute(sql); + } catch (SQLException e) { + throw new RuntimeException("Initializing PostgreSql table failed!", e); + } + } + + @Test + public void testFakeSourceToJdbcSink() throws SQLException, IOException, InterruptedException { + Container.ExecResult execResult = executeSeaTunnelSparkJob("/jdbc/fakesource_to_jdbc.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + String sql = "select * from test"; + try (Connection connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword())) { + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(sql); + List result = Lists.newArrayList(); + while (resultSet.next()) { + result.add(resultSet.getString("name")); + } + Assertions.assertFalse(result.isEmpty()); + } + } + + @AfterEach + public void closePostgreSqlContainer() { + if (psl != null) { + psl.stop(); + } + } +} diff --git a/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java b/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java new file mode 100644 index 00000000000..f430070197d --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.spark.jdbc; + +import org.apache.seatunnel.e2e.spark.SparkContainer; + +import com.google.common.collect.Lists; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.stream.Stream; + +public class JdbcSourceToConsoleIT extends SparkContainer { + private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSourceToConsoleIT.class); + private PostgreSQLContainer psl; + + @SuppressWarnings("checkstyle:MagicNumber") + @BeforeEach + public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoundException, SQLException { + psl = new PostgreSQLContainer<>(DockerImageName.parse("postgres:alpine3.16")) + .withNetwork(NETWORK) + .withNetworkAliases("postgresql") + .withLogConsumer(new Slf4jLogConsumer(LOGGER)); + psl.setPortBindings(Lists.newArrayList("33306:3306")); + Startables.deepStart(Stream.of(psl)).join(); + LOGGER.info("PostgreSql container started"); + Thread.sleep(5000L); + Class.forName(psl.getDriverClassName()); + initializeJdbcTable(); + batchInsertData(); + } + + private void initializeJdbcTable() { + try (Connection connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword())) { + Statement statement = connection.createStatement(); + String sql = "CREATE TABLE test (\n" + + " name varchar(255) NOT NULL,\n" + + " age int NOT NULL\n" + + ")"; + statement.executeUpdate(sql); + } catch (SQLException e) { + throw new RuntimeException("Initializing PostgreSql table failed!", e); + } + } + + @SuppressWarnings("checkstyle:MagicNumber") + private void batchInsertData() throws SQLException { + try (Connection connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword())) { + String sql = "insert into test(name,age) values(?,?)"; + connection.setAutoCommit(false); + PreparedStatement preparedStatement = connection.prepareStatement(sql); + for (int i = 0; i < 10; i++) { + preparedStatement.setString(1, "Mike"); + preparedStatement.setInt(2, 20); + preparedStatement.addBatch(); + } + preparedStatement.executeBatch(); + connection.commit(); + } catch (SQLException e) { + throw new RuntimeException("Batch insert data failed!", e); + } + } + + @Test + public void testFakeSourceToJdbcSink() throws SQLException, IOException, InterruptedException { + Container.ExecResult execResult = executeSeaTunnelSparkJob("/jdbc/jdbcsource_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + } + + @AfterEach + public void closePostgreSqlContainer() { + if (psl != null) { + psl.stop(); + } + } +} diff --git a/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf b/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf new file mode 100644 index 00000000000..8aae4fe31c5 --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf @@ -0,0 +1,60 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + Fake { + result_table_name = "fake" + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake +} + +transform { + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql +} + +sink { + jdbc { + driver = org.postgresql.Driver + saveMode = "update", + truncate = "true", + url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF", + user = "test", + password = "test", + dbTable = "fake", + customUpdateStmt = "insert into test(name) values(?)" + jdbc.connect_timeout = 10000 + jdbc.socket_timeout = 10000 + } + + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf b/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf new file mode 100644 index 00000000000..d9555f20be5 --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf @@ -0,0 +1,53 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + jdbc { + driver = org.postgresql.Driver + url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF", + table = "test" + result_table_name = "test_log" + user = "test" + password = "test" + } + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake +} + +transform { + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql +} + +sink { + Console {} + + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console +} \ No newline at end of file diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt index 3aeb12dbaf1..4db58ce61c7 100755 --- a/tools/dependencies/known-dependencies.txt +++ b/tools/dependencies/known-dependencies.txt @@ -54,7 +54,6 @@ calcite-core-1.29.0.jar calcite-druid-1.29.0.jar calcite-linq4j-1.29.0.jar checker-qual-3.10.0.jar -checker-qual-3.4.0.jar chill-java-0.9.3.jar chill_2.11-0.9.3.jar classmate-1.1.0.jar @@ -731,4 +730,5 @@ zookeeper-jute-3.5.9.jar zstd-jni-1.3.3-1.jar zstd-jni-1.4.3-1.jar jakarta.activation-api-1.2.1.jar -jakarta.xml.bind-api-2.3.2.jar \ No newline at end of file +jakarta.xml.bind-api-2.3.2.jar +postgresql-42.3.3.jar \ No newline at end of file