diff --git a/docs/en/connector-v2/sink/Jdbc.md b/docs/en/connector-v2/sink/Jdbc.md index 952521509d7..76705606be0 100644 --- a/docs/en/connector-v2/sink/Jdbc.md +++ b/docs/en/connector-v2/sink/Jdbc.md @@ -132,6 +132,7 @@ there are some reference value for params above. | GBase8a | com.gbase.jdbc.Driver | jdbc:gbase://e2e_gbase8aDb:5258/test | / | https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar | | StarRocks | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | / | https://mvnrepository.com/artifact/mysql/mysql-connector-java | | db2 | com.ibm.db2.jcc.DB2Driver | jdbc:db2://localhost:50000/testdb | com.ibm.db2.jcc.DB2XADataSource | https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4 | +| Doris | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | / | https://mvnrepository.com/artifact/mysql/mysql-connector-java | | teradata | com.teradata.jdbc.TeraDriver | jdbc:teradata://localhost/DBS_PORT=1025,DATABASE=test | / | https://mvnrepository.com/artifact/com.teradata.jdbc/terajdbc | ## Example @@ -193,10 +194,10 @@ sink { ### 2.3.0-beta 2022-10-20 - [BugFix] Fix JDBC split exception ([2904](https://github.com/apache/incubator-seatunnel/pull/2904)) -- [Feature] Support Phoenix JDBC Source ([2499](https://github.com/apache/incubator-seatunnel/pull/2499)) -- [Feature] Support SQL Server JDBC Source ([2646](https://github.com/apache/incubator-seatunnel/pull/2646)) -- [Feature] Support Oracle JDBC Source ([2550](https://github.com/apache/incubator-seatunnel/pull/2550)) -- [Feature] Support StarRocks JDBC Source ([3060](https://github.com/apache/incubator-seatunnel/pull/3060)) +- [Feature] Support Phoenix JDBC Sink ([2499](https://github.com/apache/incubator-seatunnel/pull/2499)) +- [Feature] Support SQL Server JDBC Sink ([2646](https://github.com/apache/incubator-seatunnel/pull/2646)) +- [Feature] Support Oracle JDBC Sink ([2550](https://github.com/apache/incubator-seatunnel/pull/2550)) +- [Feature] Support StarRocks JDBC Sink ([3060](https://github.com/apache/incubator-seatunnel/pull/3060)) - [Feature] Support DB2 JDBC Sink ([2410](https://github.com/apache/incubator-seatunnel/pull/2410)) ### next version @@ -205,3 +206,4 @@ sink { - [Feature] Support Teradata JDBC Sink ([3362](https://github.com/apache/incubator-seatunnel/pull/3362)) - [Feature] Support Sqlite JDBC Sink ([3089](https://github.com/apache/incubator-seatunnel/pull/3089)) - [Feature] Support CDC write DELETE/UPDATE/INSERT events ([3378](https://github.com/apache/incubator-seatunnel/issues/3378)) +- [Feature] Support Doris JDBC Sink diff --git a/docs/en/connector-v2/source/Jdbc.md b/docs/en/connector-v2/source/Jdbc.md index 742ba48fd2f..9ad87616703 100644 --- a/docs/en/connector-v2/source/Jdbc.md +++ b/docs/en/connector-v2/source/Jdbc.md @@ -109,6 +109,7 @@ there are some reference value for params above. | starrocks | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java | | db2 | com.ibm.db2.jcc.DB2Driver | jdbc:db2://localhost:50000/testdb | https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4 | | tablestore | com.alicloud.openservices.tablestore.jdbc.OTSDriver | "jdbc:ots:http s://myinstance.cn-hangzhou.ots.aliyuncs.com/myinstance" | https://mvnrepository.com/artifact/com.aliyun.openservices/tablestore-jdbc | +| doris | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java | | teradata | com.teradata.jdbc.TeraDriver | jdbc:teradata://localhost/DBS_PORT=1025,DATABASE=test | https://mvnrepository.com/artifact/com.teradata.jdbc/terajdbc | ## Example @@ -162,3 +163,4 @@ parallel: - [Feature] Support Tablestore Source ([3309](https://github.com/apache/incubator-seatunnel/pull/3309)) - [Feature] Support Teradata JDBC Source ([3362](https://github.com/apache/incubator-seatunnel/pull/3362)) - [Feature] Support JDBC Fetch Size Config ([3478](https://github.com/apache/incubator-seatunnel/pull/3478)) +- [Feature] Support Doris JDBC Source diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeMapper.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeMapper.java index aa8206185e1..84a8e8bdc66 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeMapper.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeMapper.java @@ -112,6 +112,10 @@ public SeaTunnelDataType mapping(ResultSetMetaData metadata, int colIndex) th case MYSQL_BIGINT_UNSIGNED: return new DecimalType(20, 0); case MYSQL_DECIMAL: + if (precision > 38) { + LOG.warn("{} will probably cause value overflow.", MYSQL_DECIMAL); + return new DecimalType(38, 18); + } return new DecimalType(precision, scale); case MYSQL_DECIMAL_UNSIGNED: return new DecimalType(precision + 1, scale); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDorisdbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDorisdbIT.java new file mode 100644 index 00000000000..5b325a069c3 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDorisdbIT.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc; + +import static org.awaitility.Awaitility.given; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.common.utils.ExceptionUtils; +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; +import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; + +import com.google.common.collect.Lists; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.shaded.org.apache.commons.io.IOUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.math.BigDecimal; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.nio.charset.StandardCharsets; +import java.sql.Connection; +import java.sql.Driver; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +@Slf4j +@Disabled("Doris docker container is unstable") +public class JdbcDorisdbIT extends TestSuiteBase implements TestResource { + private static final String DOCKER_IMAGE = "taozex/doris:v1.1.1"; + private static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver"; + private static final String HOST = "doris_e2e"; + private static final int DOCKER_PORT = 9030; + private static final int PORT = 8960; + + private static final String URL = "jdbc:mysql://%s:" + PORT; + private static final String USERNAME = "root"; + private static final String PASSWORD = ""; + private static final String DATABASE = "test"; + private static final String SOURCE_TABLE = "e2e_table_source"; + private static final String SINK_TABLE = "e2e_table_sink"; + private static final String DRIVER_JAR = "https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar"; + private static final String COLUMN_STRING = "BIGINT_COL, LARGEINT_COL, SMALLINT_COL, TINYINT_COL, BOOLEAN_COL, DECIMAL_COL, DOUBLE_COL, FLOAT_COL, INT_COL, CHAR_COL, VARCHAR_11_COL, STRING_COL, DATETIME_COL, DATE_COL"; + + private static final String DDL_SOURCE = "create table " + DATABASE + "." + SOURCE_TABLE + " (\n" + + " BIGINT_COL BIGINT,\n" + + " LARGEINT_COL LARGEINT,\n" + + " SMALLINT_COL SMALLINT,\n" + + " TINYINT_COL TINYINT,\n" + + " BOOLEAN_COL BOOLEAN,\n" + + " DECIMAL_COL DECIMAL,\n" + + " DOUBLE_COL DOUBLE,\n" + + " FLOAT_COL FLOAT,\n" + + " INT_COL INT,\n" + + " CHAR_COL CHAR,\n" + + " VARCHAR_11_COL VARCHAR(11),\n" + + " STRING_COL STRING,\n" + + " DATETIME_COL DATETIME,\n" + + " DATE_COL DATE\n" + + ")ENGINE=OLAP\n" + + "DUPLICATE KEY(`BIGINT_COL`)\n" + + "DISTRIBUTED BY HASH(`BIGINT_COL`) BUCKETS 1\n" + + "PROPERTIES (\n" + + "\"replication_allocation\" = \"tag.location.default: 1\"" + + ")"; + + private static final String DDL_SINK = "create table " + DATABASE + "." + SINK_TABLE + " (\n" + + " BIGINT_COL BIGINT,\n" + + " LARGEINT_COL LARGEINT,\n" + + " SMALLINT_COL SMALLINT,\n" + + " TINYINT_COL TINYINT,\n" + + " BOOLEAN_COL BOOLEAN,\n" + + " DECIMAL_COL DECIMAL,\n" + + " DOUBLE_COL DOUBLE,\n" + + " FLOAT_COL FLOAT,\n" + + " INT_COL INT,\n" + + " CHAR_COL CHAR,\n" + + " VARCHAR_11_COL VARCHAR(11),\n" + + " STRING_COL STRING,\n" + + " DATETIME_COL DATETIME,\n" + + " DATE_COL DATE\n" + + ")ENGINE=OLAP\n" + + "DUPLICATE KEY(`BIGINT_COL`)\n" + + "DISTRIBUTED BY HASH(`BIGINT_COL`) BUCKETS 1\n" + + "PROPERTIES (\n" + + "\"replication_allocation\" = \"tag.location.default: 1\"" + + ")"; + + private static final String INIT_DATA_SQL = "insert into " + DATABASE + "." + SOURCE_TABLE + " (\n" + + " BIGINT_COL,\n" + + " LARGEINT_COL,\n" + + " SMALLINT_COL,\n" + + " TINYINT_COL,\n" + + " BOOLEAN_COL,\n" + + " DECIMAL_COL,\n" + + " DOUBLE_COL,\n" + + " FLOAT_COL,\n" + + " INT_COL,\n" + + " CHAR_COL,\n" + + " VARCHAR_11_COL,\n" + + " STRING_COL,\n" + + " DATETIME_COL,\n" + + " DATE_COL\n" + + ")values(\n" + + "\t?,?,?,?,?,?,?,?,?,?,?,?,?,?\n" + + ")"; + + private Connection jdbcConnection; + private GenericContainer dorisServer; + private static final List TEST_DATASET = generateTestDataSet(); + + @TestContainerExtension + private final ContainerExtendedFactory extendedFactory = container -> { + Container.ExecResult extraCommands = container.execInContainer("bash", "-c", "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O " + DRIVER_JAR); + Assertions.assertEquals(0, extraCommands.getExitCode()); + }; + + @BeforeAll + @Override + public void startUp() throws Exception { + dorisServer = new GenericContainer<>(DOCKER_IMAGE) + .withNetwork(NETWORK) + .withNetworkAliases(HOST) + .withLogConsumer(new Slf4jLogConsumer(log)); + dorisServer.setPortBindings(Lists.newArrayList( + String.format("%s:%s", PORT, DOCKER_PORT))); + Startables.deepStart(Stream.of(dorisServer)).join(); + log.info("Doris container started"); + // wait to add BE + Thread.sleep(600000); + // wait for doris fully start + given().ignoreExceptions() + .await() + .atMost(600, TimeUnit.SECONDS) + .untilAsserted(this::initializeJdbcConnection); + initializeJdbcTable(); + batchInsertData(); + } + + private static List generateTestDataSet() { + + List rows = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + SeaTunnelRow row = new SeaTunnelRow( + new Object[]{ + Long.valueOf(i), + Long.valueOf(1123456), + Short.parseShort("1"), + Byte.parseByte("1"), + Boolean.FALSE, + BigDecimal.valueOf(2222243, 1), + Double.parseDouble("2222243.2222243"), + Float.parseFloat("222224"), + Integer.parseInt("1"), + "a", + "VARCHAR_COL", + "STRING_COL", + "2022-03-02 13:24:45", + "2022-03-02" + }); + rows.add(row); + } + return rows; + } + + @AfterAll + @Override + public void tearDown() throws Exception { + if (jdbcConnection != null) { + jdbcConnection.close(); + } + if (dorisServer != null) { + dorisServer.close(); + } + } + + @TestTemplate + public void testDorisSink(TestContainer container) throws IOException, InterruptedException { + Container.ExecResult execResult = container.executeJob("/jdbc_doris_source_and_sink.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + try { + assertHasData(SINK_TABLE); + + String sourceSql = String.format("select * from %s.%s", DATABASE, SOURCE_TABLE); + String sinkSql = String.format("select * from %s.%s", DATABASE, SINK_TABLE); + List columnList = Arrays.stream(COLUMN_STRING.split(",")).map(x -> x.trim()).collect(Collectors.toList()); + Statement sourceStatement = jdbcConnection.createStatement(); + Statement sinkStatement = jdbcConnection.createStatement(); + ResultSet sourceResultSet = sourceStatement.executeQuery(sourceSql); + ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql); + Assertions.assertEquals(sourceResultSet.getMetaData().getColumnCount(), sinkResultSet.getMetaData().getColumnCount()); + while (sourceResultSet.next()) { + if (sinkResultSet.next()) { + for (String column : columnList) { + Object source = sourceResultSet.getObject(column); + Object sink = sinkResultSet.getObject(column); + if (!Objects.deepEquals(source, sink)) { + InputStream sourceAsciiStream = sourceResultSet.getBinaryStream(column); + InputStream sinkAsciiStream = sinkResultSet.getBinaryStream(column); + String sourceValue = IOUtils.toString(sourceAsciiStream, StandardCharsets.UTF_8); + String sinkValue = IOUtils.toString(sinkAsciiStream, StandardCharsets.UTF_8); + Assertions.assertEquals(sourceValue, sinkValue); + } + } + } + } + //Check the row numbers is equal + sourceResultSet.last(); + sinkResultSet.last(); + Assertions.assertEquals(sourceResultSet.getRow(), sinkResultSet.getRow()); + clearSinkTable(); + } catch (Exception e) { + throw new RuntimeException("Get doris connection error", e); + } + } + + private void initializeJdbcConnection() throws SQLException, ClassNotFoundException, MalformedURLException, InstantiationException, IllegalAccessException { + URLClassLoader urlClassLoader = new URLClassLoader(new URL[]{new URL(DRIVER_JAR)}, JdbcDorisdbIT.class.getClassLoader()); + Thread.currentThread().setContextClassLoader(urlClassLoader); + Driver driver = (Driver) urlClassLoader.loadClass(DRIVER_CLASS).newInstance(); + Properties props = new Properties(); + props.put("user", USERNAME); + props.put("password", PASSWORD); + jdbcConnection = driver.connect(String.format(URL, dorisServer.getHost()), props); + } + + private void initializeJdbcTable() { + try (Statement statement = jdbcConnection.createStatement()) { + // create databases + statement.execute("create database test"); + // create source table + statement.execute(DDL_SOURCE); + // create sink table + statement.execute(DDL_SINK); + } catch (SQLException e) { + throw new RuntimeException("Initializing table failed!", e); + } + } + + private void batchInsertData() { + List rows = TEST_DATASET; + try { + jdbcConnection.setAutoCommit(false); + try (PreparedStatement preparedStatement = jdbcConnection.prepareStatement(INIT_DATA_SQL)) { + for (int i = 0; i < rows.size(); i++) { + for (int index = 0; index < rows.get(i).getFields().length; index++) { + preparedStatement.setObject(index + 1, rows.get(i).getFields()[index]); + } + preparedStatement.addBatch(); + } + preparedStatement.executeBatch(); + } + jdbcConnection.commit(); + } catch (Exception exception) { + log.error(ExceptionUtils.getMessage(exception)); + throw new RuntimeException("Get connection error", exception); + } + } + + private void assertHasData(String table) { + try (Statement statement = jdbcConnection.createStatement()) { + String sql = String.format("select * from %s.%s limit 1", DATABASE, table); + ResultSet source = statement.executeQuery(sql); + Assertions.assertTrue(source.next()); + } catch (Exception e) { + throw new RuntimeException("Test doris server image error", e); + } + } + + private void clearSinkTable() { + try (Statement statement = jdbcConnection.createStatement()) { + statement.execute(String.format("TRUNCATE TABLE %s.%s", DATABASE, SINK_TABLE)); + } catch (SQLException e) { + throw new RuntimeException("Test doris server image error", e); + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_doris_source_and_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_doris_source_and_sink.conf new file mode 100644 index 00000000000..7a2da5fbad0 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_doris_source_and_sink.conf @@ -0,0 +1,44 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + Jdbc { + driver = com.mysql.cj.jdbc.Driver + url = "jdbc:mysql://doris_e2e:9030" + user = root + password = "" + query = "select BIGINT_COL, LARGEINT_COL, SMALLINT_COL, TINYINT_COL, BOOLEAN_COL, DECIMAL_COL, DOUBLE_COL, FLOAT_COL, INT_COL, CHAR_COL, VARCHAR_11_COL, STRING_COL, DATETIME_COL, DATE_COL from `test`.`e2e_table_source`" + } +} + +transform { +} + +sink { + Jdbc { + driver = com.mysql.cj.jdbc.Driver + url = "jdbc:mysql://doris_e2e:9030" + user = root + password = "" + query = "INSERT INTO `test`.`e2e_table_sink` (BIGINT_COL, LARGEINT_COL, SMALLINT_COL, TINYINT_COL, BOOLEAN_COL, DECIMAL_COL, DOUBLE_COL, FLOAT_COL, INT_COL, CHAR_COL, VARCHAR_11_COL, STRING_COL, DATETIME_COL, DATE_COL) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?)" + } +} \ No newline at end of file