diff --git a/docs/en/connector-v2/sink/Jdbc.md b/docs/en/connector-v2/sink/Jdbc.md index f145cc220f2..af586d5ecb5 100644 --- a/docs/en/connector-v2/sink/Jdbc.md +++ b/docs/en/connector-v2/sink/Jdbc.md @@ -128,6 +128,7 @@ there are some reference value for params above. | Phoenix | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | / | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client | | SQL Server | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:sqlserver://localhost:1433 | com.microsoft.sqlserver.jdbc.SQLServerXADataSource | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc | | Oracle | oracle.jdbc.OracleDriver | jdbc:oracle:thin:@localhost:1521/xepdb1 | oracle.jdbc.xa.OracleXADataSource | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 | +| sqlite | org.sqlite.JDBC | jdbc:sqlite:test.db | / | https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc | | GBase8a | com.gbase.jdbc.Driver | jdbc:gbase://e2e_gbase8aDb:5258/test | / | https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar | | StarRocks | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | / | https://mvnrepository.com/artifact/mysql/mysql-connector-java | | db2 | com.ibm.db2.jcc.DB2Driver | jdbc:db2://localhost:50000/testdb | com.ibm.db2.jcc.DB2XADataSource | https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4 | @@ -199,4 +200,5 @@ sink { ### next version +- [Feature] Support Sqlite JDBC Sink ([3089](https://github.com/apache/incubator-seatunnel/pull/3089)) - [Feature] Support CDC write DELETE/UPDATE/INSERT events ([3378](https://github.com/apache/incubator-seatunnel/issues/3378)) \ No newline at end of file diff --git a/docs/en/connector-v2/source/Jdbc.md b/docs/en/connector-v2/source/Jdbc.md index 20bb0bb1e16..e6c1c08a92e 100644 --- a/docs/en/connector-v2/source/Jdbc.md +++ b/docs/en/connector-v2/source/Jdbc.md @@ -104,6 +104,7 @@ there are some reference value for params above. | phoenix | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client | | sqlserver | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:sqlserver://localhost:1433 | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc | | oracle | oracle.jdbc.OracleDriver | jdbc:oracle:thin:@localhost:1521/xepdb1 | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 | +| sqlite | org.sqlite.JDBC | jdbc:sqlite:test.db | https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc | | gbase8a | com.gbase.jdbc.Driver | jdbc:gbase://e2e_gbase8aDb:5258/test | https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar | | starrocks | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java | | db2 | com.ibm.db2.jcc.DB2Driver | jdbc:db2://localhost:50000/testdb | https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4 | @@ -156,5 +157,6 @@ parallel: ### next version - [BugFix] Fix jdbc split bug ([3220](https://github.com/apache/incubator-seatunnel/pull/3220)) +- [Feature] Support Sqlite JDBC Source ([3089](https://github.com/apache/incubator-seatunnel/pull/3089)) - [Feature] Support Tablestore Source ([3309](https://github.com/apache/incubator-seatunnel/pull/3309)) - [Feature] Support JDBC Fetch Size Config ([3478](https://github.com/apache/incubator-seatunnel/pull/3478)) diff --git a/pom.xml b/pom.xml index 726327b142c..1785241fe33 100644 --- a/pom.xml +++ b/pom.xml @@ -841,12 +841,6 @@ - - - net.alchim31.maven - scala-maven-plugin - - org.apache.maven.plugins maven-surefire-plugin diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml b/seatunnel-connectors-v2/connector-jdbc/pom.xml index cd4a5f221e4..3816744227f 100644 --- a/seatunnel-connectors-v2/connector-jdbc/pom.xml +++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml @@ -36,7 +36,9 @@ 9.2.1.jre8 5.2.5-HBase-2.x 12.2.0.1 + 3.39.3.0 db2jcc4 + 3.39.3.0 5.13.9 @@ -78,20 +80,24 @@ ${oracle.version} provided + + org.xerial + sqlite-jdbc + ${sqlite.version} + provided + com.ibm.db2.jcc db2jcc ${db2.version} provided - com.aliyun.openservices tablestore-jdbc ${tablestore.version} provided - @@ -129,6 +135,10 @@ com.oracle.database.jdbc ojdbc8 + + org.xerial + sqlite-jdbc + com.ibm.db2.jcc db2jcc diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConfig.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConfig.java index 12eeac7aa9c..3db3bdc5e41 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConfig.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConfig.java @@ -111,6 +111,7 @@ public static JdbcConnectionOptions buildJdbcConnectionOptions(Config config) { jdbcOptions.transactionTimeoutSec = config.getInt(JdbcConfig.TRANSACTION_TIMEOUT_SEC.key()); } } + return jdbcOptions; } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteDialect.java new file mode 100644 index 00000000000..bf361dc8ccb --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteDialect.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlite; + +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; + +import java.util.Arrays; +import java.util.Optional; +import java.util.stream.Collectors; + +public class SqliteDialect implements JdbcDialect { + @Override + public String dialectName() { + return "Sqlite"; + } + + @Override + public JdbcRowConverter getRowConverter() { + return new SqliteJdbcRowConverter(); + } + + @Override + public JdbcDialectTypeMapper getJdbcDialectTypeMapper() { + return new SqliteTypeMapper(); + } + + @Override + public String quoteIdentifier(String identifier) { + return "`" + identifier + "`"; + } + + @Override + public Optional getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields) { + String updateClause = Arrays.stream(fieldNames) + .map(fieldName -> quoteIdentifier(fieldName) + "=VALUES(" + quoteIdentifier(fieldName) + ")") + .collect(Collectors.joining(", ")); + + String conflictFields = Arrays.stream(uniqueKeyFields) + .map(this::quoteIdentifier) + .collect(Collectors.joining(",")); + + String upsertSQL = getInsertIntoStatement(tableName, fieldNames) + " ON CONFLICT(" + conflictFields + ") DO UPDATE SET " + updateClause; + return Optional.of(upsertSQL); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteDialectFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteDialectFactory.java new file mode 100644 index 00000000000..e9639ec0c28 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteDialectFactory.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlite; + +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory; + +import com.google.auto.service.AutoService; + +/** + * Factory for {@link SqliteDialect}. + */ + +@AutoService(JdbcDialectFactory.class) +public class SqliteDialectFactory implements JdbcDialectFactory { + @Override + public boolean acceptsURL(String url) { + return url.startsWith("jdbc:sqlite:"); + } + + @Override + public JdbcDialect create() { + return new SqliteDialect(); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteJdbcRowConverter.java new file mode 100644 index 00000000000..1e56c5d430f --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteJdbcRowConverter.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlite; + +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter; + +public class SqliteJdbcRowConverter extends AbstractJdbcRowConverter { + + @Override + public String converterName() { + return "Sqlite"; + } + +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteTypeMapper.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteTypeMapper.java new file mode 100644 index 00000000000..87681aa7e84 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteTypeMapper.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlite; + +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; + +import lombok.extern.slf4j.Slf4j; + +import java.sql.ResultSetMetaData; +import java.sql.SQLException; + +@Slf4j +public class SqliteTypeMapper implements JdbcDialectTypeMapper { + + // ============================data types===================== + + private static final String SQLITE_UNKNOWN = "UNKNOWN"; + private static final String SQLITE_BIT = "BIT"; + private static final String SQLITE_BOOLEAN = "BOOLEAN"; + + // -------------------------integer---------------------------- + private static final String SQLITE_TINYINT = "TINYINT"; + private static final String SQLITE_TINYINT_UNSIGNED = "TINYINT UNSIGNED"; + private static final String SQLITE_SMALLINT = "SMALLINT"; + private static final String SQLITE_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED"; + private static final String SQLITE_MEDIUMINT = "MEDIUMINT"; + private static final String SQLITE_MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED"; + private static final String SQLITE_INT = "INT"; + private static final String SQLITE_INT_UNSIGNED = "INT UNSIGNED"; + private static final String SQLITE_INTEGER = "INTEGER"; + private static final String SQLITE_INTEGER_UNSIGNED = "INTEGER UNSIGNED"; + private static final String SQLITE_BIGINT = "BIGINT"; + private static final String SQLITE_BIGINT_UNSIGNED = "BIGINT UNSIGNED"; + private static final String SQLITE_DECIMAL = "DECIMAL"; + private static final String SQLITE_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED"; + private static final String SQLITE_FLOAT = "FLOAT"; + private static final String SQLITE_FLOAT_UNSIGNED = "FLOAT UNSIGNED"; + private static final String SQLITE_DOUBLE = "DOUBLE"; + private static final String SQLITE_DOUBLE_PRECISION = "DOUBLE PRECISION"; + private static final String SQLITE_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED"; + private static final String SQLITE_NUMERIC = "NUMERIC"; + private static final String SQLITE_REAL = "REAL"; + + // -------------------------text---------------------------- + private static final String SQLITE_CHAR = "CHAR"; + private static final String SQLITE_CHARACTER = "CHARACTER"; + private static final String SQLITE_VARYING_CHARACTER = "VARYING_CHARACTER"; + private static final String SQLITE_NATIVE_CHARACTER = "NATIVE_CHARACTER"; + private static final String SQLITE_NCHAR = "NCHAR"; + private static final String SQLITE_VARCHAR = "VARCHAR"; + private static final String SQLITE_LONGVARCHAR = "LONGVARCHAR"; + private static final String SQLITE_LONGNVARCHAR = "LONGNVARCHAR"; + private static final String SQLITE_NVARCHAR = "NVARCHAR"; + private static final String SQLITE_TINYTEXT = "TINYTEXT"; + private static final String SQLITE_MEDIUMTEXT = "MEDIUMTEXT"; + private static final String SQLITE_TEXT = "TEXT"; + private static final String SQLITE_LONGTEXT = "LONGTEXT"; + private static final String SQLITE_JSON = "JSON"; + private static final String SQLITE_CLOB = "CLOB"; + + // ------------------------------time(text)------------------------- + private static final String SQLITE_DATE = "DATE"; + private static final String SQLITE_DATETIME = "DATETIME"; + private static final String SQLITE_TIME = "TIME"; + private static final String SQLITE_TIMESTAMP = "TIMESTAMP"; + + // ------------------------------blob------------------------- + private static final String SQLITE_TINYBLOB = "TINYBLOB"; + private static final String SQLITE_MEDIUMBLOB = "MEDIUMBLOB"; + private static final String SQLITE_BLOB = "BLOB"; + private static final String SQLITE_LONGBLOB = "LONGBLOB"; + private static final String SQLITE_BINARY = "BINARY"; + private static final String SQLITE_VARBINARY = "VARBINARY"; + private static final String SQLITE_LONGVARBINARY = "LONGVARBINARY"; + + @Override + public SeaTunnelDataType mapping(ResultSetMetaData metadata, int colIndex) throws SQLException { + String columnTypeName = metadata.getColumnTypeName(colIndex).toUpperCase().trim(); + switch (columnTypeName) { + case SQLITE_BIT: + case SQLITE_BOOLEAN: + return BasicType.BOOLEAN_TYPE; + case SQLITE_TINYINT: + case SQLITE_TINYINT_UNSIGNED: + case SQLITE_SMALLINT: + case SQLITE_SMALLINT_UNSIGNED: + return BasicType.SHORT_TYPE; + case SQLITE_MEDIUMINT: + case SQLITE_MEDIUMINT_UNSIGNED: + case SQLITE_INT: + case SQLITE_INTEGER: + return BasicType.INT_TYPE; + case SQLITE_INT_UNSIGNED: + case SQLITE_INTEGER_UNSIGNED: + case SQLITE_BIGINT: + case SQLITE_BIGINT_UNSIGNED: + case SQLITE_NUMERIC: + return BasicType.LONG_TYPE; + case SQLITE_DECIMAL: + case SQLITE_DECIMAL_UNSIGNED: + case SQLITE_DOUBLE: + case SQLITE_DOUBLE_PRECISION: + case SQLITE_REAL: + return BasicType.DOUBLE_TYPE; + case SQLITE_FLOAT: + return BasicType.FLOAT_TYPE; + case SQLITE_FLOAT_UNSIGNED: + log.warn("{} will probably cause value overflow.", SQLITE_FLOAT_UNSIGNED); + return BasicType.FLOAT_TYPE; + case SQLITE_DOUBLE_UNSIGNED: + log.warn("{} will probably cause value overflow.", SQLITE_DOUBLE_UNSIGNED); + return BasicType.DOUBLE_TYPE; + case SQLITE_CHARACTER: + case SQLITE_VARYING_CHARACTER: + case SQLITE_NATIVE_CHARACTER: + case SQLITE_NVARCHAR: + case SQLITE_NCHAR: + case SQLITE_LONGNVARCHAR: + case SQLITE_LONGVARCHAR: + case SQLITE_CLOB: + case SQLITE_CHAR: + case SQLITE_TINYTEXT: + case SQLITE_MEDIUMTEXT: + case SQLITE_TEXT: + case SQLITE_VARCHAR: + case SQLITE_JSON: + case SQLITE_LONGTEXT: + + case SQLITE_DATE: + case SQLITE_TIME: + case SQLITE_DATETIME: + case SQLITE_TIMESTAMP: + return BasicType.STRING_TYPE; + + case SQLITE_TINYBLOB: + case SQLITE_MEDIUMBLOB: + case SQLITE_BLOB: + case SQLITE_LONGBLOB: + case SQLITE_VARBINARY: + case SQLITE_BINARY: + case SQLITE_LONGVARBINARY: + return PrimitiveByteArrayType.INSTANCE; + + //Doesn't support yet + case SQLITE_UNKNOWN: + default: + final String jdbcColumnName = metadata.getColumnName(colIndex); + throw new UnsupportedOperationException( + String.format( + "Doesn't support SQLite type '%s' on column '%s' yet.", + columnTypeName, jdbcColumnName)); + } + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/options/JdbcConnectionOptions.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/options/JdbcConnectionOptions.java index 6487c71404f..c10cbf96ed3 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/options/JdbcConnectionOptions.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/options/JdbcConnectionOptions.java @@ -39,6 +39,7 @@ public class JdbcConnectionOptions public int maxRetries = DEFAULT_MAX_RETRIES; public String username; public String password; + public String query; public boolean autoCommit = JdbcConfig.AUTO_COMMIT.defaultValue(); diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml index f6220d9b07d..c2e7bad81f0 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml @@ -108,6 +108,11 @@ mssql-jdbc test + + org.xerial + sqlite-jdbc + test + diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqliteIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqliteIT.java new file mode 100644 index 00000000000..4206134c238 --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqliteIT.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.flink.v2.jdbc; + +import org.apache.seatunnel.common.config.CheckConfigUtil; +import org.apache.seatunnel.core.starter.config.ConfigBuilder; +import org.apache.seatunnel.e2e.flink.FlinkContainer; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.utility.MountableFile; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +@Slf4j +public class JdbcSqliteIT extends FlinkContainer { + private String tmpdir; + private Config config; + private static final List> TEST_DATASET = generateTestDataset(); + private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/org/xerial/sqlite-jdbc/3.39.3.0/sqlite-jdbc-3.39.3.0.jar"; + + private void initTestDb() throws Exception { + URI resource = Objects.requireNonNull(JdbcSqliteIT.class.getResource("/jdbc/init_sql/sqlite_init.conf")).toURI(); + config = new ConfigBuilder(Paths.get(resource)).getConfig(); + CheckConfigUtil.checkAllExists(this.config, "source_table", "sink_table", "type_source_table", + "type_sink_table", "insert_type_source_table_sql", "check_type_sink_table_sql"); + tmpdir = Paths.get(System.getProperty("java.io.tmpdir")).toString(); + Connection connection = null; + try { + Class.forName("org.sqlite.JDBC"); + connection = DriverManager.getConnection("jdbc:sqlite:" + tmpdir + "/test.db", "", ""); + Statement statement = connection.createStatement(); + statement.execute("drop table if exists source"); + statement.execute("drop table if exists sink"); + statement.execute("drop table if exists type_source_table"); + statement.execute("drop table if exists type_sink_table"); + statement.execute(config.getString("source_table")); + statement.execute(config.getString("sink_table")); + statement.execute(config.getString("type_source_table")); + statement.execute(config.getString("type_sink_table")); + statement.execute(config.getString("insert_type_source_table_sql")); + + String sql = "insert into source(age, name) values(?, ?)"; + connection.setAutoCommit(false); + try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) { + for (List row : TEST_DATASET) { + preparedStatement.setInt(1, (Integer) row.get(0)); + preparedStatement.setString(2, (String) row.get(1)); + preparedStatement.addBatch(); + } + preparedStatement.executeBatch(); + } + connection.commit(); + } catch (Exception e) { + if (null != connection) { + try { + connection.rollback(); + } catch (SQLException ex) { + ex.printStackTrace(); + } + } + throw e; + } + } + + private static List> generateTestDataset() { + List> rows = new ArrayList<>(); + for (int i = 1; i <= 100; i++) { + rows.add(Arrays.asList(i, String.format("test_%s", i))); + } + return rows; + } + + @Test + public void testJdbcSqliteSourceAndSinkDataType() throws Exception { + Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/jdbc_sqlite_source_and_sink_datatype.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + taskManager.copyFileFromContainer(Paths.get("/sqlite/test.db").toString(), new File(tmpdir + "/test.db").toPath().toString()); + checkSinkDataTypeTable(); + } + + private void checkSinkDataTypeTable() throws Exception { + URI resource = Objects.requireNonNull(JdbcSqliteIT.class.getResource("/jdbc/init_sql/sqlite_init.conf")).toURI(); + config = new ConfigBuilder(Paths.get(resource)).getConfig(); + CheckConfigUtil.checkAllExists(this.config, "source_table", "sink_table", "type_source_table", + "type_sink_table", "insert_type_source_table_sql", "check_type_sink_table_sql"); + + try (Connection connection = DriverManager.getConnection("jdbc:sqlite:" + tmpdir + "/test.db", "", "")) { + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(config.getString("check_type_sink_table_sql")); + resultSet.next(); + Assertions.assertEquals(resultSet.getInt(1), 2); + } + } + + @Test + public void testJdbcSqliteSourceAndSink() throws IOException, InterruptedException, SQLException { + Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/jdbc_sqlite_source_and_sink.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + taskManager.copyFileFromContainer(Paths.get("/sqlite/test.db").toString(), new File(tmpdir + "/test.db").toPath().toString()); + // query result + String sql = "select age, name from sink order by age asc"; + List> result = new ArrayList<>(); + try (Connection connection = DriverManager.getConnection("jdbc:sqlite:" + tmpdir + "/test.db", "", "")) { + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(sql); + while (resultSet.next()) { + result.add(Arrays.asList( + resultSet.getInt(1), + resultSet.getString(2))); + } + Assertions.assertIterableEquals(TEST_DATASET, result); + } + } + + @AfterEach + public void closeResource() throws SQLException, IOException { + // remove the temp test.db file + Files.deleteIfExists(new File(tmpdir + "/test.db").toPath()); + } + + @Override + protected void executeExtraCommands(GenericContainer container) throws IOException, InterruptedException { + Container.ExecResult extraCommands = container.execInContainer("bash", "-c", "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O " + THIRD_PARTY_PLUGINS_URL); + Assertions.assertEquals(0, extraCommands.getExitCode()); + + Container.ExecResult mkdirCommands1 = jobManager.execInContainer("bash", "-c", "mkdir -p " + "/sqlite"); + Assertions.assertEquals(0, mkdirCommands1.getExitCode()); + Container.ExecResult mkdirCommands2 = taskManager.execInContainer("bash", "-c", "mkdir -p " + "/sqlite"); + Assertions.assertEquals(0, mkdirCommands2.getExitCode()); + jobManager.execInContainer("bash", "-c", "chmod 777 -R /sqlite"); + taskManager.execInContainer("bash", "-c", "chmod 777 -R /sqlite"); + try { + initTestDb(); + // copy db file to container, dist file path in container is /tmp/seatunnel/data/test.db + jobManager.copyFileToContainer(MountableFile.forHostPath(tmpdir + "/test.db"), "/sqlite/test.db"); + taskManager.copyFileToContainer(MountableFile.forHostPath(tmpdir + "/test.db"), "/sqlite/test.db"); + jobManager.execInContainer("bash", "-c", "chmod 777 /sqlite/test.db"); + taskManager.execInContainer("bash", "-c", "chmod 777 /sqlite/test.db"); + } catch (Exception e) { + log.error("init test.db and copy test.db to container error", e); + Files.deleteIfExists(new File(tmpdir + "/test.db").toPath()); + } + } +} diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/init_sql/sqlite_init.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/init_sql/sqlite_init.conf new file mode 100644 index 00000000000..aeef3d5fa58 --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/init_sql/sqlite_init.conf @@ -0,0 +1,212 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +source_table = """ CREATE TABLE source ( `id` INTEGER PRIMARY KEY AUTOINCREMENT, `name` CHAR ( 10 ), `age` INT ) """ + +sink_table = """ CREATE TABLE sink ( `id` INTEGER PRIMARY KEY AUTOINCREMENT, `name` CHAR ( 10 ), `age` INT ) """ + +type_source_table = """ +CREATE TABLE `type_source_table` ( + `binary` BINARY ( 64 ) DEFAULT NULL, + `blob` BLOB, + `long_varbinary` MEDIUMBLOB, + `longblob` LONGBLOB, + `tinyblob` TINYBLOB, + `varbinary` VARBINARY ( 100 ) DEFAULT NULL, + + `tinyint` TINYINT DEFAULT NULL, + `tinyint_unsigned` TINYINT UNSIGNED DEFAULT NULL, + `smallint` SMALLINT DEFAULT NULL, + `smallint_unsigned` SMALLINT UNSIGNED DEFAULT NULL, + `mediumint` MEDIUMINT DEFAULT NULL, + `mediumint_unsigned` MEDIUMINT UNSIGNED DEFAULT NULL, + `int` INT DEFAULT NULL, + `int_unsigned` INT UNSIGNED DEFAULT NULL, + `integer` INT DEFAULT NULL, + `integer_unsigned` INT UNSIGNED DEFAULT NULL, + `bigint` BIGINT DEFAULT NULL, + `bigint_unsigned` BIGINT UNSIGNED DEFAULT NULL, + `numeric` DECIMAL ( 10, 0 ) DEFAULT NULL, + `decimal` DECIMAL ( 10, 0 ) DEFAULT NULL, + `float` FLOAT DEFAULT NULL, + `double` DOUBLE DEFAULT NULL, + `double_precision` DOUBLE DEFAULT NULL, + + `longtext` LONGTEXT, + `mediumtext` MEDIUMTEXT, + `text` text, + `tinytext` TINYTEXT, + `varchar` VARCHAR ( 100 ) DEFAULT NULL, + `json` json DEFAULT NULL, + + `date` date DEFAULT NULL, + `datetime` datetime DEFAULT NULL, + `timestamp` TIMESTAMP NULL DEFAULT NULL +) +""" + +type_sink_table = """ +CREATE TABLE `type_sink_table` ( + `binary` BINARY ( 64 ) DEFAULT NULL, + `blob` BLOB, + `long_varbinary` MEDIUMBLOB, + `longblob` LONGBLOB, + `tinyblob` TINYBLOB, + `varbinary` VARBINARY ( 100 ) DEFAULT NULL, + + `tinyint` TINYINT DEFAULT NULL, + `tinyint_unsigned` TINYINT UNSIGNED DEFAULT NULL, + `smallint` SMALLINT DEFAULT NULL, + `smallint_unsigned` SMALLINT UNSIGNED DEFAULT NULL, + `mediumint` MEDIUMINT DEFAULT NULL, + `mediumint_unsigned` MEDIUMINT UNSIGNED DEFAULT NULL, + `int` INT DEFAULT NULL, + `int_unsigned` INT UNSIGNED DEFAULT NULL, + `integer` INT DEFAULT NULL, + `integer_unsigned` INT UNSIGNED DEFAULT NULL, + `bigint` BIGINT DEFAULT NULL, + `bigint_unsigned` BIGINT UNSIGNED DEFAULT NULL, + `numeric` DECIMAL ( 10, 0 ) DEFAULT NULL, + `decimal` DECIMAL ( 10, 0 ) DEFAULT NULL, + `float` FLOAT DEFAULT NULL, + `double` DOUBLE DEFAULT NULL, + `double_precision` DOUBLE DEFAULT NULL, + + `longtext` LONGTEXT, + `mediumtext` MEDIUMTEXT, + `text` text, + `tinytext` TINYTEXT, + `varchar` VARCHAR ( 100 ) DEFAULT NULL, + `json` json DEFAULT NULL, + + `date` date DEFAULT NULL, + `datetime` datetime DEFAULT NULL, + `timestamp` TIMESTAMP NULL DEFAULT NULL +) +""" + +insert_type_source_table_sql = """ +INSERT INTO `type_source_table` ( + `binary`, + `blob`, + `long_varbinary`, + `longblob`, + `tinyblob`, + `varbinary`, + `tinyint`, + `tinyint_unsigned`, + `smallint`, + `smallint_unsigned`, + `mediumint`, + `mediumint_unsigned`, + `int`, + `int_unsigned`, + `integer`, + `integer_unsigned`, + `bigint`, + `bigint_unsigned`, + `numeric`, + `decimal`, + `float`, + `double`, + `double_precision`, + `longtext`, + `mediumtext`, + `text`, + `tinytext`, + `varchar`, + `json`, + `date`, + `datetime`, + `timestamp` +) +VALUES + ( + NULL, + NULL, + NULL, + NULL, + NULL, + NULL, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 'a', + 'a', + 'a', + 'a', + 'a', + '{}', + '2022-09-22', + '2022-09-22 15:07:44', + '2022-09-22 15:07:55' + ) +""" + +check_type_sink_table_sql = """ +SELECT + count( 1 ) +FROM + ( SELECT * FROM type_source_table UNION ALL SELECT * FROM type_sink_table ) a +GROUP BY + `binary`, + `blob`, + `long_varbinary`, + `longblob`, + `tinyblob`, + `varbinary`, + `tinyint`, + `tinyint_unsigned`, + `smallint`, + `smallint_unsigned`, + `mediumint`, + `mediumint_unsigned`, + `int`, + `int_unsigned`, + `integer`, + `integer_unsigned`, + `bigint`, + `bigint_unsigned`, + `numeric`, + `decimal`, + `float`, + `double`, + `double_precision`, + `longtext`, + `mediumtext`, + `text`, + `tinytext`, + `varchar`, + `json`, + `date`, + `datetime`, + `timestamp` +""" \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_sqlite_source_and_sink.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_sqlite_source_and_sink.conf new file mode 100644 index 00000000000..a3273d98d65 --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_sqlite_source_and_sink.conf @@ -0,0 +1,51 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set flink configuration here + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + Jdbc { + driver = org.sqlite.JDBC + url = "jdbc:sqlite:/sqlite/test.db" + user = "" + password = "" + query = "select age, name from source" + } +} + +transform { + + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/transform/sql +} + +sink { + Jdbc { + driver = org.sqlite.JDBC + url = "jdbc:sqlite:/sqlite/test.db" + user = "" + password = "" + query = "insert into sink(age, name) values(?, ?)" + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_sqlite_source_and_sink_datatype.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_sqlite_source_and_sink_datatype.conf new file mode 100644 index 00000000000..68525fc8c09 --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_sqlite_source_and_sink_datatype.conf @@ -0,0 +1,147 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + source.parallelism = 1 + job.mode = "BATCH" +} + +source{ + jdbc{ + url = "jdbc:sqlite:/sqlite/test.db" + driver = org.sqlite.JDBC + user = "" + password = "" + query = """ SELECT + `binary`, + `blob`, + `long_varbinary`, + `longblob`, + `tinyblob`, + `varbinary`, + `tinyint`, + `tinyint_unsigned`, + `smallint`, + `smallint_unsigned`, + `mediumint`, + `mediumint_unsigned`, + `int`, + `int_unsigned`, + `integer`, + `integer_unsigned`, + `bigint`, + `bigint_unsigned`, + `numeric`, + `decimal`, + `float`, + `double`, + `double_precision`, + `longtext`, + `mediumtext`, + `text`, + `tinytext`, + `varchar`, + `json`, + `date`, + `datetime`, + `timestamp` + + FROM `type_source_table` + """ + } +} + +transform { +} + +sink { + jdbc { + url = "jdbc:sqlite:/sqlite/test.db" + driver = org.sqlite.JDBC + user = "" + password = "" + query = """ INSERT INTO `type_sink_table` ( + `binary`, + `blob`, + `long_varbinary`, + `longblob`, + `tinyblob`, + `varbinary`, + `tinyint`, + `tinyint_unsigned`, + `smallint`, + `smallint_unsigned`, + `mediumint`, + `mediumint_unsigned`, + `int`, + `int_unsigned`, + `integer`, + `integer_unsigned`, + `bigint`, + `bigint_unsigned`, + `numeric`, + `decimal`, + `float`, + `double`, + `double_precision`, + `longtext`, + `mediumtext`, + `text`, + `tinytext`, + `varchar`, + `json`, + `date`, + `datetime`, + `timestamp` + ) + VALUES + ( + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ? + )""" + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml index ee7629f1498..8cd754f05e3 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml @@ -96,6 +96,11 @@ com.microsoft.sqlserver mssql-jdbc + + org.xerial + sqlite-jdbc + test + diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqliteIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqliteIT.java new file mode 100644 index 00000000000..c9d2815dc65 --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqliteIT.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.spark.v2.jdbc; + +import org.apache.seatunnel.common.config.CheckConfigUtil; +import org.apache.seatunnel.core.starter.config.ConfigBuilder; +import org.apache.seatunnel.e2e.spark.SparkContainer; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.utility.MountableFile; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +@Slf4j +public class JdbcSqliteIT extends SparkContainer { + private String tmpdir; + private Config config; + private static final List> TEST_DATASET = generateTestDataset(); + private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/org/xerial/sqlite-jdbc/3.39.3.0/sqlite-jdbc-3.39.3.0.jar"; + + private void initTestDb() throws Exception { + URI resource = Objects.requireNonNull(JdbcSqliteIT.class.getResource("/jdbc/init_sql/sqlite_init.conf")).toURI(); + config = new ConfigBuilder(Paths.get(resource)).getConfig(); + CheckConfigUtil.checkAllExists(this.config, "source_table", "sink_table", "type_source_table", + "type_sink_table", "insert_type_source_table_sql", "check_type_sink_table_sql"); + tmpdir = Paths.get(System.getProperty("java.io.tmpdir")).toString(); + Connection connection = null; + try { + Class.forName("org.sqlite.JDBC"); + connection = DriverManager.getConnection("jdbc:sqlite:" + tmpdir + "/test.db", "", ""); + Statement statement = connection.createStatement(); + statement.execute("drop table if exists source"); + statement.execute("drop table if exists sink"); + statement.execute("drop table if exists type_source_table"); + statement.execute("drop table if exists type_sink_table"); + statement.execute(config.getString("source_table")); + statement.execute(config.getString("sink_table")); + statement.execute(config.getString("type_source_table")); + statement.execute(config.getString("type_sink_table")); + statement.execute(config.getString("insert_type_source_table_sql")); + + String sql = "insert into source(age, name) values(?, ?)"; + connection.setAutoCommit(false); + try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) { + for (List row : TEST_DATASET) { + preparedStatement.setInt(1, (Integer) row.get(0)); + preparedStatement.setString(2, (String) row.get(1)); + preparedStatement.addBatch(); + } + preparedStatement.executeBatch(); + } + connection.commit(); + } catch (Exception e) { + if (null != connection) { + try { + connection.rollback(); + } catch (SQLException ex) { + ex.printStackTrace(); + } + } + throw e; + } + } + + private static List> generateTestDataset() { + List> rows = new ArrayList<>(); + for (int i = 1; i <= 100; i++) { + rows.add(Arrays.asList(i, String.format("test_%s", i))); + } + return rows; + } + + @Test + public void testJdbcSqliteSourceAndSinkDataType() throws Exception { + Container.ExecResult execResult = executeSeaTunnelSparkJob("/jdbc/jdbc_sqlite_source_and_sink_datatype.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + master.copyFileFromContainer(Paths.get("/sqlite/test.db").toString(), new File(tmpdir + "/test.db").toPath().toString()); + checkSinkDataTypeTable(); + } + + private void checkSinkDataTypeTable() throws Exception { + URI resource = Objects.requireNonNull(JdbcSqliteIT.class.getResource("/jdbc/init_sql/sqlite_init.conf")).toURI(); + config = new ConfigBuilder(Paths.get(resource)).getConfig(); + CheckConfigUtil.checkAllExists(this.config, "source_table", "sink_table", "type_source_table", + "type_sink_table", "insert_type_source_table_sql", "check_type_sink_table_sql"); + + try (Connection connection = DriverManager.getConnection("jdbc:sqlite:" + tmpdir + "/test.db", "", "")) { + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(config.getString("check_type_sink_table_sql")); + resultSet.next(); + Assertions.assertEquals(resultSet.getInt(1), 2); + } + } + + @Test + public void testJdbcSqliteSourceAndSink() throws IOException, InterruptedException, SQLException { + Container.ExecResult execResult = executeSeaTunnelSparkJob("/jdbc/jdbc_sqlite_source_and_sink.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + master.copyFileFromContainer(Paths.get("/sqlite/test.db").toString(), new File(tmpdir + "/test.db").toPath().toString()); + // query result + String sql = "select age, name from sink order by age asc"; + List> result = new ArrayList<>(); + try (Connection connection = DriverManager.getConnection("jdbc:sqlite:" + tmpdir + "/test.db", "", "")) { + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(sql); + while (resultSet.next()) { + result.add(Arrays.asList( + resultSet.getInt(1), + resultSet.getString(2))); + } + Assertions.assertIterableEquals(TEST_DATASET, result); + } + } + + @AfterEach + public void closeResource() throws SQLException, IOException { + // remove the temp test.db + Files.deleteIfExists(new File(tmpdir + "/test.db").toPath()); + } + + @Override + protected void executeExtraCommands(GenericContainer container) throws IOException, InterruptedException { + Container.ExecResult extraCommands = container.execInContainer("bash", "-c", "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O " + THIRD_PARTY_PLUGINS_URL); + Assertions.assertEquals(0, extraCommands.getExitCode()); + + Container.ExecResult mkdirCommands = container.execInContainer("bash", "-c", "mkdir -p " + "/sqlite"); + Assertions.assertEquals(0, mkdirCommands.getExitCode()); + + Container.ExecResult chmodCommands = container.execInContainer("bash", "-c", "chmod 777 -R /sqlite"); + Assertions.assertEquals(0, chmodCommands.getExitCode()); + + try { + initTestDb(); + // copy db file to container, dist file path in container is /tmp/seatunnel/data/test.db + container.copyFileToContainer(MountableFile.forHostPath(tmpdir + "/test.db"), "/sqlite/test.db"); + container.execInContainer("bash", "-c", "chmod 777 /sqlite/test.db"); + } catch (Exception e) { + log.error("init test.db and copy test.db to container error", e); + Files.deleteIfExists(new File(tmpdir + "/test.db").toPath()); + } + } + +} diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/init_sql/sqlite_init.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/init_sql/sqlite_init.conf new file mode 100644 index 00000000000..aeef3d5fa58 --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/init_sql/sqlite_init.conf @@ -0,0 +1,212 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +source_table = """ CREATE TABLE source ( `id` INTEGER PRIMARY KEY AUTOINCREMENT, `name` CHAR ( 10 ), `age` INT ) """ + +sink_table = """ CREATE TABLE sink ( `id` INTEGER PRIMARY KEY AUTOINCREMENT, `name` CHAR ( 10 ), `age` INT ) """ + +type_source_table = """ +CREATE TABLE `type_source_table` ( + `binary` BINARY ( 64 ) DEFAULT NULL, + `blob` BLOB, + `long_varbinary` MEDIUMBLOB, + `longblob` LONGBLOB, + `tinyblob` TINYBLOB, + `varbinary` VARBINARY ( 100 ) DEFAULT NULL, + + `tinyint` TINYINT DEFAULT NULL, + `tinyint_unsigned` TINYINT UNSIGNED DEFAULT NULL, + `smallint` SMALLINT DEFAULT NULL, + `smallint_unsigned` SMALLINT UNSIGNED DEFAULT NULL, + `mediumint` MEDIUMINT DEFAULT NULL, + `mediumint_unsigned` MEDIUMINT UNSIGNED DEFAULT NULL, + `int` INT DEFAULT NULL, + `int_unsigned` INT UNSIGNED DEFAULT NULL, + `integer` INT DEFAULT NULL, + `integer_unsigned` INT UNSIGNED DEFAULT NULL, + `bigint` BIGINT DEFAULT NULL, + `bigint_unsigned` BIGINT UNSIGNED DEFAULT NULL, + `numeric` DECIMAL ( 10, 0 ) DEFAULT NULL, + `decimal` DECIMAL ( 10, 0 ) DEFAULT NULL, + `float` FLOAT DEFAULT NULL, + `double` DOUBLE DEFAULT NULL, + `double_precision` DOUBLE DEFAULT NULL, + + `longtext` LONGTEXT, + `mediumtext` MEDIUMTEXT, + `text` text, + `tinytext` TINYTEXT, + `varchar` VARCHAR ( 100 ) DEFAULT NULL, + `json` json DEFAULT NULL, + + `date` date DEFAULT NULL, + `datetime` datetime DEFAULT NULL, + `timestamp` TIMESTAMP NULL DEFAULT NULL +) +""" + +type_sink_table = """ +CREATE TABLE `type_sink_table` ( + `binary` BINARY ( 64 ) DEFAULT NULL, + `blob` BLOB, + `long_varbinary` MEDIUMBLOB, + `longblob` LONGBLOB, + `tinyblob` TINYBLOB, + `varbinary` VARBINARY ( 100 ) DEFAULT NULL, + + `tinyint` TINYINT DEFAULT NULL, + `tinyint_unsigned` TINYINT UNSIGNED DEFAULT NULL, + `smallint` SMALLINT DEFAULT NULL, + `smallint_unsigned` SMALLINT UNSIGNED DEFAULT NULL, + `mediumint` MEDIUMINT DEFAULT NULL, + `mediumint_unsigned` MEDIUMINT UNSIGNED DEFAULT NULL, + `int` INT DEFAULT NULL, + `int_unsigned` INT UNSIGNED DEFAULT NULL, + `integer` INT DEFAULT NULL, + `integer_unsigned` INT UNSIGNED DEFAULT NULL, + `bigint` BIGINT DEFAULT NULL, + `bigint_unsigned` BIGINT UNSIGNED DEFAULT NULL, + `numeric` DECIMAL ( 10, 0 ) DEFAULT NULL, + `decimal` DECIMAL ( 10, 0 ) DEFAULT NULL, + `float` FLOAT DEFAULT NULL, + `double` DOUBLE DEFAULT NULL, + `double_precision` DOUBLE DEFAULT NULL, + + `longtext` LONGTEXT, + `mediumtext` MEDIUMTEXT, + `text` text, + `tinytext` TINYTEXT, + `varchar` VARCHAR ( 100 ) DEFAULT NULL, + `json` json DEFAULT NULL, + + `date` date DEFAULT NULL, + `datetime` datetime DEFAULT NULL, + `timestamp` TIMESTAMP NULL DEFAULT NULL +) +""" + +insert_type_source_table_sql = """ +INSERT INTO `type_source_table` ( + `binary`, + `blob`, + `long_varbinary`, + `longblob`, + `tinyblob`, + `varbinary`, + `tinyint`, + `tinyint_unsigned`, + `smallint`, + `smallint_unsigned`, + `mediumint`, + `mediumint_unsigned`, + `int`, + `int_unsigned`, + `integer`, + `integer_unsigned`, + `bigint`, + `bigint_unsigned`, + `numeric`, + `decimal`, + `float`, + `double`, + `double_precision`, + `longtext`, + `mediumtext`, + `text`, + `tinytext`, + `varchar`, + `json`, + `date`, + `datetime`, + `timestamp` +) +VALUES + ( + NULL, + NULL, + NULL, + NULL, + NULL, + NULL, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 'a', + 'a', + 'a', + 'a', + 'a', + '{}', + '2022-09-22', + '2022-09-22 15:07:44', + '2022-09-22 15:07:55' + ) +""" + +check_type_sink_table_sql = """ +SELECT + count( 1 ) +FROM + ( SELECT * FROM type_source_table UNION ALL SELECT * FROM type_sink_table ) a +GROUP BY + `binary`, + `blob`, + `long_varbinary`, + `longblob`, + `tinyblob`, + `varbinary`, + `tinyint`, + `tinyint_unsigned`, + `smallint`, + `smallint_unsigned`, + `mediumint`, + `mediumint_unsigned`, + `int`, + `int_unsigned`, + `integer`, + `integer_unsigned`, + `bigint`, + `bigint_unsigned`, + `numeric`, + `decimal`, + `float`, + `double`, + `double_precision`, + `longtext`, + `mediumtext`, + `text`, + `tinytext`, + `varchar`, + `json`, + `date`, + `datetime`, + `timestamp` +""" \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_sqlite_source_and_sink.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_sqlite_source_and_sink.conf new file mode 100644 index 00000000000..d67a5677a58 --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_sqlite_source_and_sink.conf @@ -0,0 +1,57 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set flink configuration here + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + jdbc { + driver = org.sqlite.JDBC + url = "jdbc:sqlite:/sqlite/test.db" + user = "" + password = "" + query = "select age, name from source" + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/Jdbc +} + +transform { + + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/transform/sql +} + +sink { + jdbc { + driver = org.sqlite.JDBC + url = "jdbc:sqlite:/sqlite/test.db" + user = "" + password = "" + query = "insert into sink(age, name) values(?, ?)" + } + + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_sqlite_source_and_sink_datatype.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_sqlite_source_and_sink_datatype.conf new file mode 100644 index 00000000000..68525fc8c09 --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_sqlite_source_and_sink_datatype.conf @@ -0,0 +1,147 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + source.parallelism = 1 + job.mode = "BATCH" +} + +source{ + jdbc{ + url = "jdbc:sqlite:/sqlite/test.db" + driver = org.sqlite.JDBC + user = "" + password = "" + query = """ SELECT + `binary`, + `blob`, + `long_varbinary`, + `longblob`, + `tinyblob`, + `varbinary`, + `tinyint`, + `tinyint_unsigned`, + `smallint`, + `smallint_unsigned`, + `mediumint`, + `mediumint_unsigned`, + `int`, + `int_unsigned`, + `integer`, + `integer_unsigned`, + `bigint`, + `bigint_unsigned`, + `numeric`, + `decimal`, + `float`, + `double`, + `double_precision`, + `longtext`, + `mediumtext`, + `text`, + `tinytext`, + `varchar`, + `json`, + `date`, + `datetime`, + `timestamp` + + FROM `type_source_table` + """ + } +} + +transform { +} + +sink { + jdbc { + url = "jdbc:sqlite:/sqlite/test.db" + driver = org.sqlite.JDBC + user = "" + password = "" + query = """ INSERT INTO `type_sink_table` ( + `binary`, + `blob`, + `long_varbinary`, + `longblob`, + `tinyblob`, + `varbinary`, + `tinyint`, + `tinyint_unsigned`, + `smallint`, + `smallint_unsigned`, + `mediumint`, + `mediumint_unsigned`, + `int`, + `int_unsigned`, + `integer`, + `integer_unsigned`, + `bigint`, + `bigint_unsigned`, + `numeric`, + `decimal`, + `float`, + `double`, + `double_precision`, + `longtext`, + `mediumtext`, + `text`, + `tinytext`, + `varchar`, + `json`, + `date`, + `datetime`, + `timestamp` + ) + VALUES + ( + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ? + )""" + } +} \ No newline at end of file