From e94a14d5ec27d794b806ba701bd11317d325564d Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Sat, 5 Nov 2022 22:44:56 +0800 Subject: [PATCH 01/29] add tablestore --- .../connector-jdbc/pom.xml | 20 ++++- .../jdbc/config/JdbcSourceOptions.java | 5 ++ .../dialect/tablestore/TablestoreDialect.java | 50 ++++++++++++ .../tablestore/TablestoreDialectFactory.java | 40 +++++++++ .../TablestoreJdbcRowConverter.java | 39 +++++++++ .../tablestore/TablestoreTypeMapper.java | 81 +++++++++++++++++++ .../seatunnel/jdbc/source/JdbcSource.java | 4 + 7 files changed, 238 insertions(+), 1 deletion(-) create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialect.java create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialectFactory.java create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreJdbcRowConverter.java create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreTypeMapper.java diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml b/seatunnel-connectors-v2/connector-jdbc/pom.xml index c04b9c9bd79..aca2f10b251 100644 --- a/seatunnel-connectors-v2/connector-jdbc/pom.xml +++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml @@ -37,6 +37,7 @@ 5.2.5-HBase-2.x 12.2.0.1 db2jcc4 + 5.13.5 @@ -84,11 +85,24 @@ provided + + com.aliyun.openservices + tablestore-jdbc + ${tablestore.version} + provided + + + + org.apache.seatunnel + connector-common + ${project.version} + + mysql mysql-connector-java @@ -119,6 +133,10 @@ com.ibm.db2.jcc db2jcc + + com.aliyun.openservices + tablestore-jdbc + - \ No newline at end of file + diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceOptions.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceOptions.java index adbd5244c37..453fd557168 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceOptions.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceOptions.java @@ -19,6 +19,7 @@ import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConfig.buildJdbcConnectionOptions; +import org.apache.seatunnel.connectors.seatunnel.common.config.CommonConfig; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectionOptions; import org.apache.seatunnel.shade.com.typesafe.config.Config; @@ -33,6 +34,7 @@ @AllArgsConstructor public class JdbcSourceOptions implements Serializable { private JdbcConnectionOptions jdbcConnectionOptions; + private Config schema; private String partitionColumn; private Long partitionUpperBound; private Long partitionLowerBound; @@ -40,6 +42,9 @@ public class JdbcSourceOptions implements Serializable { public JdbcSourceOptions(Config config) { this.jdbcConnectionOptions = buildJdbcConnectionOptions(config); + if (config.hasPath(CommonConfig.SCHEMA)) { + this.schema = config.getConfig(CommonConfig.SCHEMA); + } if (config.hasPath(JdbcConfig.PARTITION_COLUMN)) { this.partitionColumn = config.getString(JdbcConfig.PARTITION_COLUMN); } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialect.java new file mode 100644 index 00000000000..bf1645a6f1b --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialect.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.tablestore; + +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; + +public class TablestoreDialect implements JdbcDialect { + @Override + public String dialectName() { + return "Tablestore"; + } + + @Override + public JdbcRowConverter getRowConverter() { + return new TablestoreJdbcRowConverter(); + } + + @Override + public JdbcDialectTypeMapper getJdbcDialectTypeMapper() { + return new TablestoreTypeMapper(); + } + + @Override + public PreparedStatement creatPreparedStatement(Connection connection, String queryTemplate, int fetchSize) throws SQLException { + PreparedStatement statement = connection.prepareStatement(queryTemplate); + statement.setFetchSize(fetchSize); + return statement; + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialectFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialectFactory.java new file mode 100644 index 00000000000..995cbb6f351 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialectFactory.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.tablestore; + +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory; + +import com.google.auto.service.AutoService; + +/** + * Factory for {@link TablestoreDialect}. + */ + +@AutoService(JdbcDialectFactory.class) +public class TablestoreDialectFactory implements JdbcDialectFactory { + @Override + public boolean acceptsURL(String url) { + return url.startsWith("jdbc:ots:https:"); + } + + @Override + public JdbcDialect create() { + return new TablestoreDialect(); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreJdbcRowConverter.java new file mode 100644 index 00000000000..81e2df6b941 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreJdbcRowConverter.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.tablestore; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter; + +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; + +public class TablestoreJdbcRowConverter extends AbstractJdbcRowConverter { + + @Override + public String converterName() { + return "Tablestore"; + } + + @Override + public SeaTunnelRow toInternal(ResultSet rs, ResultSetMetaData metaData, SeaTunnelRowType typeInfo) throws SQLException { + return super.toInternal(rs, metaData, typeInfo); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreTypeMapper.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreTypeMapper.java new file mode 100644 index 00000000000..e9712995cde --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreTypeMapper.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.tablestore; + +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; + +import lombok.extern.slf4j.Slf4j; + +import java.sql.ResultSetMetaData; +import java.sql.SQLException; + +@Slf4j +public class TablestoreTypeMapper implements JdbcDialectTypeMapper { + + + // ============================data types===================== + + private static final String TABLESTORE_UNKNOWN = "UNKNOWN"; + + private static final String TABLESTORE_BOOL = "BOOL"; + + // -------------------------number---------------------------- + private static final String TABLESTORE_BIGINT = "BIGINT"; + private static final String TABLESTORE_DOUBLE = "DOUBLE"; + // -------------------------string---------------------------- + private static final String TABLESTORE_VARCHAR = "VARCHAR"; + private static final String TABLESTORE_MEDIUMTEXT = "MEDIUMTEXT"; + + // ------------------------------blob------------------------- + private static final String TABLESTORE_VARBINARY = "VARBINARY"; + private static final String TABLESTORE_MEDIUMBLOB = "MEDIUMBLOB"; + + @SuppressWarnings("checkstyle:MagicNumber") + @Override + public SeaTunnelDataType mapping(ResultSetMetaData metadata, int colIndex) throws SQLException { + String tablestoreServerType = metadata.getColumnTypeName(colIndex).toUpperCase(); + int precision = metadata.getPrecision(colIndex); + int scale = metadata.getScale(colIndex); + switch (tablestoreServerType) { + case TABLESTORE_BOOL: + return BasicType.BOOLEAN_TYPE; + case TABLESTORE_BIGINT: + return BasicType.LONG_TYPE; + case TABLESTORE_DOUBLE: + return new DecimalType(precision, scale); + case TABLESTORE_VARCHAR: + case TABLESTORE_MEDIUMTEXT: + return BasicType.STRING_TYPE; + case TABLESTORE_VARBINARY: + case TABLESTORE_MEDIUMBLOB: + return PrimitiveByteArrayType.INSTANCE; + //Doesn't support yet + case TABLESTORE_UNKNOWN: + default: + final String jdbcColumnName = metadata.getColumnName(colIndex); + throw new UnsupportedOperationException( + String.format( + "Doesn't support TABLESTORE type '%s' on column '%s' yet.", + tablestoreServerType, jdbcColumnName)); + } + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java index 9765980990f..19663ea3c83 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java @@ -28,6 +28,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema; import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceOptions; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcInputFormat; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider; @@ -146,6 +147,9 @@ private SeaTunnelRowType initTableField(Connection conn) { } } catch (Exception e) { LOG.warn("get row type info exception", e); + if (jdbcSourceOptions.getSchema() != null) { + return SeaTunnelSchema.buildWithConfig(jdbcSourceOptions.getSchema()).getSeaTunnelRowType(); + } } return new SeaTunnelRowType(fieldNames.toArray(new String[0]), seaTunnelDataTypes.toArray(new SeaTunnelDataType[0])); } From d036fc681082dd22939e6fced240f1242b34dab7 Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Sun, 6 Nov 2022 21:09:08 +0800 Subject: [PATCH 02/29] fix unable to release link problem --- seatunnel-connectors-v2/connector-jdbc/pom.xml | 2 +- .../connectors/seatunnel/jdbc/source/JdbcSource.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml b/seatunnel-connectors-v2/connector-jdbc/pom.xml index aca2f10b251..cd4a5f221e4 100644 --- a/seatunnel-connectors-v2/connector-jdbc/pom.xml +++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml @@ -37,7 +37,7 @@ 5.2.5-HBase-2.x 12.2.0.1 db2jcc4 - 5.13.5 + 5.13.9 diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java index 19663ea3c83..78efeb1a5f2 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java @@ -78,8 +78,8 @@ public void prepare(Config pluginConfig) throws PrepareFailException { jdbcConnectionProvider = new SimpleJdbcConnectionProvider(jdbcSourceOptions.getJdbcConnectionOptions()); query = jdbcSourceOptions.getJdbcConnectionOptions().query; jdbcDialect = JdbcDialectLoader.load(jdbcSourceOptions.getJdbcConnectionOptions().getUrl()); - try { - typeInfo = initTableField(jdbcConnectionProvider.getOrEstablishConnection()); + try (Connection connection = jdbcConnectionProvider.getOrEstablishConnection()) { + typeInfo = initTableField(connection); partitionParameter = initPartitionParameterAndExtendSql(jdbcConnectionProvider.getOrEstablishConnection()); } catch (Exception e) { throw new PrepareFailException("jdbc", PluginType.SOURCE, e.toString()); From 77c9ad4386723f02d75d039489d467288f957ab7 Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Sun, 6 Nov 2022 21:58:42 +0800 Subject: [PATCH 03/29] remove schema --- .../jdbc/config/JdbcSourceOptions.java | 5 ----- .../seatunnel/jdbc/source/JdbcSource.java | 17 ++--------------- 2 files changed, 2 insertions(+), 20 deletions(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceOptions.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceOptions.java index 453fd557168..adbd5244c37 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceOptions.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceOptions.java @@ -19,7 +19,6 @@ import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConfig.buildJdbcConnectionOptions; -import org.apache.seatunnel.connectors.seatunnel.common.config.CommonConfig; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectionOptions; import org.apache.seatunnel.shade.com.typesafe.config.Config; @@ -34,7 +33,6 @@ @AllArgsConstructor public class JdbcSourceOptions implements Serializable { private JdbcConnectionOptions jdbcConnectionOptions; - private Config schema; private String partitionColumn; private Long partitionUpperBound; private Long partitionLowerBound; @@ -42,9 +40,6 @@ public class JdbcSourceOptions implements Serializable { public JdbcSourceOptions(Config config) { this.jdbcConnectionOptions = buildJdbcConnectionOptions(config); - if (config.hasPath(CommonConfig.SCHEMA)) { - this.schema = config.getConfig(CommonConfig.SCHEMA); - } if (config.hasPath(JdbcConfig.PARTITION_COLUMN)) { this.partitionColumn = config.getString(JdbcConfig.PARTITION_COLUMN); } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java index 78efeb1a5f2..79d4671364c 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java @@ -28,7 +28,6 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.constants.PluginType; -import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema; import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceOptions; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcInputFormat; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider; @@ -102,16 +101,7 @@ public Boundedness getBoundedness() { @Override public SeaTunnelDataType getProducedType() { - Connection conn; - SeaTunnelRowType seaTunnelDataType = null; - try { - conn = jdbcConnectionProvider.getOrEstablishConnection(); - seaTunnelDataType = initTableField(conn); - } catch (Exception e) { - LOG.warn("get row type info exception", e); - } - this.typeInfo = seaTunnelDataType; - return seaTunnelDataType; + return typeInfo; } @Override @@ -140,16 +130,13 @@ private SeaTunnelRowType initTableField(Connection conn) { ArrayList fieldNames = new ArrayList<>(); try { PreparedStatement ps = conn.prepareStatement(jdbcSourceOptions.getJdbcConnectionOptions().getQuery()); - ResultSetMetaData resultSetMetaData = ps.getMetaData(); + ResultSetMetaData resultSetMetaData = ps.executeQuery().getMetaData(); for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) { fieldNames.add(resultSetMetaData.getColumnName(i)); seaTunnelDataTypes.add(jdbcDialectTypeMapper.mapping(resultSetMetaData, i)); } } catch (Exception e) { LOG.warn("get row type info exception", e); - if (jdbcSourceOptions.getSchema() != null) { - return SeaTunnelSchema.buildWithConfig(jdbcSourceOptions.getSchema()).getSeaTunnelRowType(); - } } return new SeaTunnelRowType(fieldNames.toArray(new String[0]), seaTunnelDataTypes.toArray(new SeaTunnelDataType[0])); } From 77dd3962490d5684da9c8b197db050511d8dc549 Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Sun, 6 Nov 2022 22:10:20 +0800 Subject: [PATCH 04/29] add doc --- docs/en/connector-v2/sink/Jdbc.md | 29 +++++++++++++++++------------ docs/en/connector-v2/source/Jdbc.md | 25 ++++++++++++++----------- 2 files changed, 31 insertions(+), 23 deletions(-) diff --git a/docs/en/connector-v2/sink/Jdbc.md b/docs/en/connector-v2/sink/Jdbc.md index 1414faeaa9a..e1d1de3f270 100644 --- a/docs/en/connector-v2/sink/Jdbc.md +++ b/docs/en/connector-v2/sink/Jdbc.md @@ -108,17 +108,18 @@ In the case of is_exactly_once = "true", Xa transactions are used. This requires there are some reference value for params above. -| datasource | driver | url | xa_data_source_class_name | maven | -|------------|----------------------------------------------|--------------------------------------------------------------------|----------------------------------------------------|-------------------------------------------------------------------------------------------------------------| -| MySQL | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | com.mysql.cj.jdbc.MysqlXADataSource | https://mvnrepository.com/artifact/mysql/mysql-connector-java | -| PostgreSQL | org.postgresql.Driver | jdbc:postgresql://localhost:5432/postgres | org.postgresql.xa.PGXADataSource | https://mvnrepository.com/artifact/org.postgresql/postgresql | -| DM | dm.jdbc.driver.DmDriver | jdbc:dm://localhost:5236 | dm.jdbc.driver.DmdbXADataSource | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18 | -| Phoenix | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | / | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client | -| SQL Server | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:microsoft:sqlserver://localhost:1433 | com.microsoft.sqlserver.jdbc.SQLServerXADataSource | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc | -| Oracle | oracle.jdbc.OracleDriver | jdbc:oracle:thin:@localhost:1521/xepdb1 | oracle.jdbc.xa.OracleXADataSource | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 | -| GBase8a | com.gbase.jdbc.Driver | jdbc:gbase://e2e_gbase8aDb:5258/test | / | https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar | -| StarRocks | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | / | https://mvnrepository.com/artifact/mysql/mysql-connector-java | -| db2 | com.ibm.db2.jcc.DB2Driver | jdbc:db2://localhost:50000/testdb | com.ibm.db2.jcc.DB2XADataSource | https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4 | +| datasource | driver | url | xa_data_source_class_name | maven | +|------------|-------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------|----------------------------------------------------|-------------------------------------------------------------------------------------------------------------| +| MySQL | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | com.mysql.cj.jdbc.MysqlXADataSource | https://mvnrepository.com/artifact/mysql/mysql-connector-java | +| PostgreSQL | org.postgresql.Driver | jdbc:postgresql://localhost:5432/postgres | org.postgresql.xa.PGXADataSource | https://mvnrepository.com/artifact/org.postgresql/postgresql | +| DM | dm.jdbc.driver.DmDriver | jdbc:dm://localhost:5236 | dm.jdbc.driver.DmdbXADataSource | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18 | +| Phoenix | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | / | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client | +| SQL Server | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:microsoft:sqlserver://localhost:1433 | com.microsoft.sqlserver.jdbc.SQLServerXADataSource | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc | +| Oracle | oracle.jdbc.OracleDriver | jdbc:oracle:thin:@localhost:1521/xepdb1 | oracle.jdbc.xa.OracleXADataSource | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 | +| GBase8a | com.gbase.jdbc.Driver | jdbc:gbase://e2e_gbase8aDb:5258/test | / | https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar | +| StarRocks | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | / | https://mvnrepository.com/artifact/mysql/mysql-connector-java | +| db2 | com.ibm.db2.jcc.DB2Driver | jdbc:db2://localhost:50000/testdb | com.ibm.db2.jcc.DB2XADataSource | https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4 | +| tablestore | com.alicloud.openservices.tablestore.jdbc.OTSDriver | jdbc:ots:https://myinstance.cn-hangzhou.ots.aliyuncs.com/myinstance | / | https://mvnrepository.com/artifact/com.aliyun.openservices/tablestore-jdbc | ## Example @@ -167,4 +168,8 @@ jdbc { - [Feature] Support SQL Server JDBC Source ([2646](https://github.com/apache/incubator-seatunnel/pull/2646)) - [Feature] Support Oracle JDBC Source ([2550](https://github.com/apache/incubator-seatunnel/pull/2550)) - [Feature] Support StarRocks JDBC Source ([3060](https://github.com/apache/incubator-seatunnel/pull/3060)) -- [Feature] Support DB2 JDBC Sink ([2410](https://github.com/apache/incubator-seatunnel/pull/2410)) \ No newline at end of file +- [Feature] Support DB2 JDBC Sink ([2410](https://github.com/apache/incubator-seatunnel/pull/2410)) + +### next version + +- [Feature] Support Tablestore Sink ([3309](https://github.com/apache/incubator-seatunnel/pull/3309)) diff --git a/docs/en/connector-v2/source/Jdbc.md b/docs/en/connector-v2/source/Jdbc.md index 3e8d9180606..6fc41b8317a 100644 --- a/docs/en/connector-v2/source/Jdbc.md +++ b/docs/en/connector-v2/source/Jdbc.md @@ -90,17 +90,18 @@ in parallel according to the concurrency of tasks. there are some reference value for params above. -| datasource | driver | url | maven | -|------------|----------------------------------------------|--------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------| -| mysql | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java | -| postgresql | org.postgresql.Driver | jdbc:postgresql://localhost:5432/postgres | https://mvnrepository.com/artifact/org.postgresql/postgresql | -| dm | dm.jdbc.driver.DmDriver | jdbc:dm://localhost:5236 | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18 | -| phoenix | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client | -| sqlserver | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:microsoft:sqlserver://localhost:1433 | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc | -| oracle | oracle.jdbc.OracleDriver | jdbc:oracle:thin:@localhost:1521/xepdb1 | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 | -| gbase8a | com.gbase.jdbc.Driver | jdbc:gbase://e2e_gbase8aDb:5258/test | https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar | -| starrocks | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java | -| db2 | com.ibm.db2.jcc.DB2Driver | jdbc:db2://localhost:50000/testdb | https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4 | +| datasource | driver | url | maven | +|------------|------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------| +| mysql | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java | +| postgresql | org.postgresql.Driver | jdbc:postgresql://localhost:5432/postgres | https://mvnrepository.com/artifact/org.postgresql/postgresql | +| dm | dm.jdbc.driver.DmDriver | jdbc:dm://localhost:5236 | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18 | +| phoenix | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client | +| sqlserver | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:microsoft:sqlserver://localhost:1433 | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc | +| oracle | oracle.jdbc.OracleDriver | jdbc:oracle:thin:@localhost:1521/xepdb1 | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 | +| gbase8a | com.gbase.jdbc.Driver | jdbc:gbase://e2e_gbase8aDb:5258/test | https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar | +| starrocks | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java | +| db2 | com.ibm.db2.jcc.DB2Driver | jdbc:db2://localhost:50000/testdb | https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4 | +| tablestore | com.alicloud.openservices.tablestore.jdbc.OTSDriver | jdbc:ots:https://myinstance.cn-hangzhou.ots.aliyuncs.com/myinstance | https://mvnrepository.com/artifact/com.aliyun.openservices/tablestore-jdbc | ## Example @@ -145,6 +146,8 @@ parallel: - [Feature] Support StarRocks JDBC Source ([3060](https://github.com/apache/incubator-seatunnel/pull/3060)) - [Feature] Support GBase8a JDBC Source ([3026](https://github.com/apache/incubator-seatunnel/pull/3026)) - [Feature] Support DB2 JDBC Source ([2410](https://github.com/apache/incubator-seatunnel/pull/2410)) + ### next version - [BugFix] Fix jdbc split bug ([3220](https://github.com/apache/incubator-seatunnel/pull/3220)) +- [Feature] Support Tablestore Source ([3309](https://github.com/apache/incubator-seatunnel/pull/3309)) From cbf1a6dd6c9cce00c4669aa29f7eb7d693656a5a Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Mon, 7 Nov 2022 18:28:58 +0800 Subject: [PATCH 05/29] fix convert error --- .../jdbc/internal/dialect/tablestore/TablestoreTypeMapper.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreTypeMapper.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreTypeMapper.java index e9712995cde..d4eaefa79fa 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreTypeMapper.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreTypeMapper.java @@ -61,7 +61,7 @@ public SeaTunnelDataType mapping(ResultSetMetaData metadata, int colIndex) th case TABLESTORE_BIGINT: return BasicType.LONG_TYPE; case TABLESTORE_DOUBLE: - return new DecimalType(precision, scale); + return BasicType.DOUBLE_TYPE; case TABLESTORE_VARCHAR: case TABLESTORE_MEDIUMTEXT: return BasicType.STRING_TYPE; From 29c68970b6f6a536ff84b1a80ed31a549c33afec Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Mon, 7 Nov 2022 18:30:48 +0800 Subject: [PATCH 06/29] fix deadlink error --- docs/en/connector-v2/sink/Jdbc.md | 2 +- docs/en/connector-v2/source/Jdbc.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/en/connector-v2/sink/Jdbc.md b/docs/en/connector-v2/sink/Jdbc.md index e1d1de3f270..9ce1a8b44f3 100644 --- a/docs/en/connector-v2/sink/Jdbc.md +++ b/docs/en/connector-v2/sink/Jdbc.md @@ -119,7 +119,7 @@ there are some reference value for params above. | GBase8a | com.gbase.jdbc.Driver | jdbc:gbase://e2e_gbase8aDb:5258/test | / | https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar | | StarRocks | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | / | https://mvnrepository.com/artifact/mysql/mysql-connector-java | | db2 | com.ibm.db2.jcc.DB2Driver | jdbc:db2://localhost:50000/testdb | com.ibm.db2.jcc.DB2XADataSource | https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4 | -| tablestore | com.alicloud.openservices.tablestore.jdbc.OTSDriver | jdbc:ots:https://myinstance.cn-hangzhou.ots.aliyuncs.com/myinstance | / | https://mvnrepository.com/artifact/com.aliyun.openservices/tablestore-jdbc | +| tablestore | com.alicloud.openservices.tablestore.jdbc.OTSDriver | "jdbc:ots:https://myinstance.cn-hangzhou.ots.aliyuncs.com/myinstance" | / | https://mvnrepository.com/artifact/com.aliyun.openservices/tablestore-jdbc | ## Example diff --git a/docs/en/connector-v2/source/Jdbc.md b/docs/en/connector-v2/source/Jdbc.md index 6fc41b8317a..596bb052725 100644 --- a/docs/en/connector-v2/source/Jdbc.md +++ b/docs/en/connector-v2/source/Jdbc.md @@ -101,7 +101,7 @@ there are some reference value for params above. | gbase8a | com.gbase.jdbc.Driver | jdbc:gbase://e2e_gbase8aDb:5258/test | https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar | | starrocks | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java | | db2 | com.ibm.db2.jcc.DB2Driver | jdbc:db2://localhost:50000/testdb | https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4 | -| tablestore | com.alicloud.openservices.tablestore.jdbc.OTSDriver | jdbc:ots:https://myinstance.cn-hangzhou.ots.aliyuncs.com/myinstance | https://mvnrepository.com/artifact/com.aliyun.openservices/tablestore-jdbc | +| tablestore | com.alicloud.openservices.tablestore.jdbc.OTSDriver | "jdbc:ots:https://myinstance.cn-hangzhou.ots.aliyuncs.com/myinstance" | https://mvnrepository.com/artifact/com.aliyun.openservices/tablestore-jdbc | ## Example From 6e608a01e2e13a006cc01a3ffdbdd091945f7880 Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Mon, 7 Nov 2022 22:35:08 +0800 Subject: [PATCH 07/29] fix code style error --- .../jdbc/internal/dialect/tablestore/TablestoreTypeMapper.java | 1 - 1 file changed, 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreTypeMapper.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreTypeMapper.java index d4eaefa79fa..0811468f1f6 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreTypeMapper.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreTypeMapper.java @@ -18,7 +18,6 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.tablestore; import org.apache.seatunnel.api.table.type.BasicType; -import org.apache.seatunnel.api.table.type.DecimalType; import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; From 53b5abb562c3ba9f9fb23222cf9faf20add2f8e1 Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Mon, 7 Nov 2022 22:36:03 +0800 Subject: [PATCH 08/29] fix code style error --- .../jdbc/internal/dialect/tablestore/TablestoreTypeMapper.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreTypeMapper.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreTypeMapper.java index 0811468f1f6..56b8fa50748 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreTypeMapper.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreTypeMapper.java @@ -52,8 +52,6 @@ public class TablestoreTypeMapper implements JdbcDialectTypeMapper { @Override public SeaTunnelDataType mapping(ResultSetMetaData metadata, int colIndex) throws SQLException { String tablestoreServerType = metadata.getColumnTypeName(colIndex).toUpperCase(); - int precision = metadata.getPrecision(colIndex); - int scale = metadata.getScale(colIndex); switch (tablestoreServerType) { case TABLESTORE_BOOL: return BasicType.BOOLEAN_TYPE; From 8359747b97d26bd303e265f6ea8b2fc8024fcb06 Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Mon, 7 Nov 2022 23:12:49 +0800 Subject: [PATCH 09/29] revert table sink --- docs/en/connector-v2/sink/Jdbc.md | 5 ----- 1 file changed, 5 deletions(-) diff --git a/docs/en/connector-v2/sink/Jdbc.md b/docs/en/connector-v2/sink/Jdbc.md index 9ce1a8b44f3..627f3e5b73d 100644 --- a/docs/en/connector-v2/sink/Jdbc.md +++ b/docs/en/connector-v2/sink/Jdbc.md @@ -119,7 +119,6 @@ there are some reference value for params above. | GBase8a | com.gbase.jdbc.Driver | jdbc:gbase://e2e_gbase8aDb:5258/test | / | https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar | | StarRocks | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | / | https://mvnrepository.com/artifact/mysql/mysql-connector-java | | db2 | com.ibm.db2.jcc.DB2Driver | jdbc:db2://localhost:50000/testdb | com.ibm.db2.jcc.DB2XADataSource | https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4 | -| tablestore | com.alicloud.openservices.tablestore.jdbc.OTSDriver | "jdbc:ots:https://myinstance.cn-hangzhou.ots.aliyuncs.com/myinstance" | / | https://mvnrepository.com/artifact/com.aliyun.openservices/tablestore-jdbc | ## Example @@ -169,7 +168,3 @@ jdbc { - [Feature] Support Oracle JDBC Source ([2550](https://github.com/apache/incubator-seatunnel/pull/2550)) - [Feature] Support StarRocks JDBC Source ([3060](https://github.com/apache/incubator-seatunnel/pull/3060)) - [Feature] Support DB2 JDBC Sink ([2410](https://github.com/apache/incubator-seatunnel/pull/2410)) - -### next version - -- [Feature] Support Tablestore Sink ([3309](https://github.com/apache/incubator-seatunnel/pull/3309)) From 0785f3b59ba83aad298567386e432a7fef520860 Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Mon, 7 Nov 2022 23:13:54 +0800 Subject: [PATCH 10/29] fix deadlink --- docs/en/connector-v2/source/Jdbc.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/connector-v2/source/Jdbc.md b/docs/en/connector-v2/source/Jdbc.md index 596bb052725..92b40b489f4 100644 --- a/docs/en/connector-v2/source/Jdbc.md +++ b/docs/en/connector-v2/source/Jdbc.md @@ -101,7 +101,7 @@ there are some reference value for params above. | gbase8a | com.gbase.jdbc.Driver | jdbc:gbase://e2e_gbase8aDb:5258/test | https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar | | starrocks | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java | | db2 | com.ibm.db2.jcc.DB2Driver | jdbc:db2://localhost:50000/testdb | https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4 | -| tablestore | com.alicloud.openservices.tablestore.jdbc.OTSDriver | "jdbc:ots:https://myinstance.cn-hangzhou.ots.aliyuncs.com/myinstance" | https://mvnrepository.com/artifact/com.aliyun.openservices/tablestore-jdbc | +| tablestore | com.alicloud.openservices.tablestore.jdbc.OTSDriver | "jdbc:ots:http s://myinstance.cn-hangzhou.ots.aliyuncs.com/myinstance" | https://mvnrepository.com/artifact/com.aliyun.openservices/tablestore-jdbc | ## Example From 0de05f5928edf0954e899522190da6710caaabf1 Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Tue, 8 Nov 2022 23:48:46 +0800 Subject: [PATCH 11/29] add tablestore sink --- .../connector-tablestore/pom.xml | 49 ++++++ .../tablestore/config/TablestoreConfig.java | 31 ++++ .../tablestore/config/TablestoreOptions.java | 67 ++++++++ .../DefaultSeaTunnelRowSerializer.java | 151 ++++++++++++++++++ .../serialize/SeaTunnelRowSerializer.java | 27 ++++ .../tablestore/sink/TablestoreSink.java | 81 ++++++++++ .../tablestore/sink/TablestoreSinkClient.java | 116 ++++++++++++++ .../tablestore/sink/TablestoreWriter.java | 48 ++++++ seatunnel-connectors-v2/pom.xml | 1 + 9 files changed, 571 insertions(+) create mode 100644 seatunnel-connectors-v2/connector-tablestore/pom.xml create mode 100644 seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java create mode 100644 seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java create mode 100644 seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/DefaultSeaTunnelRowSerializer.java create mode 100644 seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/SeaTunnelRowSerializer.java create mode 100644 seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSink.java create mode 100644 seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkClient.java create mode 100644 seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreWriter.java diff --git a/seatunnel-connectors-v2/connector-tablestore/pom.xml b/seatunnel-connectors-v2/connector-tablestore/pom.xml new file mode 100644 index 00000000000..e12a0c72d64 --- /dev/null +++ b/seatunnel-connectors-v2/connector-tablestore/pom.xml @@ -0,0 +1,49 @@ + + + + + seatunnel-connectors-v2 + org.apache.seatunnel + ${revision} + + 4.0.0 + + connector-tablestore + + + 5.13.9 + + + + + org.apache.seatunnel + connector-common + ${project.version} + + + com.aliyun.openservices + tablestore + ${tablestore.version} + + + + diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java new file mode 100644 index 00000000000..0fb02103ba7 --- /dev/null +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.tablestore.config; + +import java.io.Serializable; + +public class TablestoreConfig implements Serializable { + public static final String END_POINT = "end_point"; + public static final String INSTANCE_NAME = "instance_name"; + public static final String ACCESS_KEY_ID = "access_key_id"; + public static final String ACCESS_KEY_SECRET = "access_key_secret"; + public static final String TABLE = "table"; + public static final String BATCH_SIZE = "batch_size"; + public static final String DEFAULT_BATCH_INTERVAL_MS = "batch_interval_ms"; + public static final String PRIMARY_KEYS = "primary_keys"; +} diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java new file mode 100644 index 00000000000..ce53697ac9c --- /dev/null +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.tablestore.config; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import lombok.AllArgsConstructor; +import lombok.Data; + +import java.io.Serializable; +import java.util.List; + +@Data +@AllArgsConstructor +public class TablestoreOptions implements Serializable { + + private static final int DEFAULT_BATCH_SIZE = 25; + private static final int DEFAULT_BATCH_INTERVAL_MS = 1000; + + private String endpoint; + + private String instanceName; + + private String accessKeyId; + + private String accessKeySecret; + + private String table; + + private List primaryKeys; + + public int batchSize = DEFAULT_BATCH_SIZE; + public int batchIntervalMs = DEFAULT_BATCH_INTERVAL_MS; + + public TablestoreOptions(Config config) { + this.endpoint = config.getString(TablestoreConfig.END_POINT); + this.instanceName = config.getString(TablestoreConfig.INSTANCE_NAME); + this.accessKeyId = config.getString(TablestoreConfig.ACCESS_KEY_ID); + this.accessKeySecret = config.getString(TablestoreConfig.ACCESS_KEY_SECRET); + this.table = config.getString(TablestoreConfig.TABLE); + + if (config.hasPath(TablestoreConfig.BATCH_SIZE)) { + this.batchSize = config.getInt(TablestoreConfig.BATCH_SIZE); + } + if (config.hasPath(TablestoreConfig.DEFAULT_BATCH_INTERVAL_MS)) { + this.batchIntervalMs = config.getInt(TablestoreConfig.DEFAULT_BATCH_INTERVAL_MS); + } + if (config.hasPath(TablestoreConfig.PRIMARY_KEYS)) { + this.primaryKeys = config.getStringList(TablestoreConfig.PRIMARY_KEYS); + } + } +} diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/DefaultSeaTunnelRowSerializer.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/DefaultSeaTunnelRowSerializer.java new file mode 100644 index 00000000000..5e649dcb756 --- /dev/null +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/DefaultSeaTunnelRowSerializer.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.tablestore.serialize; + +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreOptions; + +import com.alicloud.openservices.tablestore.model.Column; +import com.alicloud.openservices.tablestore.model.ColumnType; +import com.alicloud.openservices.tablestore.model.ColumnValue; +import com.alicloud.openservices.tablestore.model.PrimaryKeyBuilder; +import com.alicloud.openservices.tablestore.model.PrimaryKeyColumn; +import com.alicloud.openservices.tablestore.model.PrimaryKeyType; +import com.alicloud.openservices.tablestore.model.PrimaryKeyValue; +import com.alicloud.openservices.tablestore.model.RowPutChange; + +public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer { + + private final SeaTunnelRowType seaTunnelRowType; + private final TablestoreOptions tablestoreOptions; + + public DefaultSeaTunnelRowSerializer(SeaTunnelRowType seaTunnelRowType, TablestoreOptions tablestoreOptions) { + this.seaTunnelRowType = seaTunnelRowType; + this.tablestoreOptions = tablestoreOptions; + } + + @Override + public RowPutChange serialize(SeaTunnelRow seaTunnelRow) { + + PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder(); + tablestoreOptions.getPrimaryKeys().forEach(primaryKeyName -> { + Object field = seaTunnelRow.getField(seaTunnelRowType.indexOf(primaryKeyName)); + primaryKeyBuilder.addPrimaryKeyColumn( + this.convertPrimaryKeyColumn(primaryKeyName, field, + this.convertPrimaryKeyType(seaTunnelRowType.getFieldType(seaTunnelRowType.indexOf(primaryKeyName))))); + + }); + + RowPutChange rowPutChange = new RowPutChange(tablestoreOptions.getTable(), primaryKeyBuilder.build()); + for (int index = 0; index < seaTunnelRowType.getFieldNames().length; index++) { + Object fieldValue = seaTunnelRow.getField(index); + String fieldName = seaTunnelRowType.getFieldName(index); + + rowPutChange.addColumn(this.convertColumn(fieldName, fieldValue, + this.convertColumnType(seaTunnelRowType.getFieldType(index)))); + } + return rowPutChange; + } + + private ColumnType convertColumnType(SeaTunnelDataType seaTunnelDataType) { + switch (seaTunnelDataType.getSqlType()) { + case INT: + case TINYINT: + case SMALLINT: + case BIGINT: + return ColumnType.INTEGER; + case FLOAT: + case DOUBLE: + case DECIMAL: + return ColumnType.DOUBLE; + case STRING: + case DATE: + case TIME: + case TIMESTAMP: + return ColumnType.STRING; + case BOOLEAN: + return ColumnType.BOOLEAN; + case BYTES: + return ColumnType.BINARY; + default: + throw new UnsupportedOperationException("Unsupported columnType: " + seaTunnelDataType); + } + } + + private PrimaryKeyType convertPrimaryKeyType(SeaTunnelDataType seaTunnelDataType) { + switch (seaTunnelDataType.getSqlType()) { + case INT: + case TINYINT: + case SMALLINT: + case BIGINT: + return PrimaryKeyType.INTEGER; + case FLOAT: + case DOUBLE: + case DECIMAL: + case STRING: + case DATE: + case TIME: + case TIMESTAMP: + case BOOLEAN: + return PrimaryKeyType.STRING; + case BYTES: + return PrimaryKeyType.BINARY; + default: + throw new UnsupportedOperationException("Unsupported primaryKeyType: " + seaTunnelDataType); + } + } + + private Column convertColumn(String columnName, Object value, ColumnType columnType) { + if (value == null) { + return null; + } + switch (columnType) { + case STRING: + return new Column(columnName, ColumnValue.fromString(String.valueOf(value))); + case INTEGER: + return new Column(columnName, ColumnValue.fromLong((long) value)); + case BOOLEAN: + return new Column(columnName, ColumnValue.fromBoolean((boolean) value)); + case DOUBLE: + return new Column(columnName, ColumnValue.fromDouble((Double) value)); + case BINARY: + return new Column(columnName, ColumnValue.fromBinary((byte[]) value)); + default: + throw new UnsupportedOperationException("Unsupported columnType: " + columnType); + } + } + + private PrimaryKeyColumn convertPrimaryKeyColumn(String columnName, Object value, PrimaryKeyType primaryKeyType) { + if (value == null) { + return null; + } + switch (primaryKeyType) { + case STRING: + return new PrimaryKeyColumn(columnName, PrimaryKeyValue.fromString(String.valueOf(value))); + case INTEGER: + return new PrimaryKeyColumn(columnName, PrimaryKeyValue.fromLong((long) value)); + case BINARY: + return new PrimaryKeyColumn(columnName, PrimaryKeyValue.fromBinary((byte[]) value)); + default: + throw new UnsupportedOperationException("Unsupported primaryKeyType: " + primaryKeyType); + } + } + +} diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/SeaTunnelRowSerializer.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/SeaTunnelRowSerializer.java new file mode 100644 index 00000000000..24a9eab8d1a --- /dev/null +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/SeaTunnelRowSerializer.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.tablestore.serialize; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; + +import com.alicloud.openservices.tablestore.model.RowPutChange; + +public interface SeaTunnelRowSerializer { + + RowPutChange serialize(SeaTunnelRow seaTunnelRow); +} diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSink.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSink.java new file mode 100644 index 00000000000..9160bcf4ca8 --- /dev/null +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSink.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.tablestore.sink; + +import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.ACCESS_KEY_ID; +import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.ACCESS_KEY_SECRET; +import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.END_POINT; +import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.INSTANCE_NAME; +import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.PRIMARY_KEYS; +import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.TABLE; + +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.config.CheckConfigUtil; +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; +import org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreOptions; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.google.auto.service.AutoService; + +import java.io.IOException; + +@AutoService(SeaTunnelSink.class) +public class TablestoreSink extends AbstractSimpleSink { + + private SeaTunnelRowType rowType; + + private TablestoreOptions tablestoreOptions; + + @Override + public String getPluginName() { + return "Tablestore"; + } + + @Override + public void prepare(Config pluginConfig) throws PrepareFailException { + CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, END_POINT, TABLE, INSTANCE_NAME, ACCESS_KEY_ID, ACCESS_KEY_SECRET, PRIMARY_KEYS); + if (!result.isSuccess()) { + throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg()); + } + tablestoreOptions = new TablestoreOptions(pluginConfig); + } + + @Override + public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) { + this.rowType = seaTunnelRowType; + } + + @Override + public SeaTunnelDataType getConsumedType() { + return rowType; + } + + @Override + public AbstractSinkWriter createWriter(SinkWriter.Context context) throws IOException { + return new TablestoreWriter(tablestoreOptions, rowType); + } +} diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkClient.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkClient.java new file mode 100644 index 00000000000..13977d92844 --- /dev/null +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkClient.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.tablestore.sink; + +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreOptions; + +import com.alicloud.openservices.tablestore.SyncClient; +import com.alicloud.openservices.tablestore.model.BatchWriteRowRequest; +import com.alicloud.openservices.tablestore.model.RowPutChange; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +public class TablestoreSinkClient { + private final TablestoreOptions tablestoreOptions; + private ScheduledExecutorService scheduler; + private ScheduledFuture scheduledFuture; + private volatile boolean initialize; + private volatile Exception flushException; + private SyncClient syncClient; + private final List batchList; + + public TablestoreSinkClient(TablestoreOptions tablestoreOptions, SeaTunnelRowType typeInfo) { + this.tablestoreOptions = tablestoreOptions; + this.batchList = new ArrayList<>(); + } + + private void tryInit() throws IOException { + if (initialize) { + return; + } + syncClient = new SyncClient( + tablestoreOptions.getEndpoint(), + tablestoreOptions.getAccessKeyId(), + tablestoreOptions.getAccessKeySecret(), + tablestoreOptions.getInstanceName()); + + scheduler = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("Tablestore-sink-output-%s").build()); + scheduledFuture = scheduler.scheduleAtFixedRate( + () -> { + try { + flush(); + } catch (IOException e) { + flushException = e; + } + }, + tablestoreOptions.getBatchIntervalMs(), + tablestoreOptions.getBatchIntervalMs(), + TimeUnit.MILLISECONDS); + + initialize = true; + } + + public synchronized void write(RowPutChange rowPutChange) throws IOException { + tryInit(); + checkFlushException(); + batchList.add(rowPutChange); + if (tablestoreOptions.getBatchSize() > 0 + && batchList.size() >= tablestoreOptions.getBatchSize()) { + flush(); + } + } + + public synchronized void close() throws IOException { + if (scheduledFuture != null) { + scheduledFuture.cancel(false); + scheduler.shutdown(); + } + if (syncClient != null) { + flush(); + syncClient.shutdown(); + } + } + + synchronized void flush() throws IOException { + checkFlushException(); + if (batchList.isEmpty()) { + return; + } + BatchWriteRowRequest batchWriteRowRequest = new BatchWriteRowRequest(); + batchList.forEach(batchWriteRowRequest::addRowChange); + syncClient.batchWriteRow(batchWriteRowRequest); + + batchList.clear(); + } + + private void checkFlushException() { + if (flushException != null) { + throw new RuntimeException("Writing items to Tablestore failed.", flushException); + } + } + +} diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreWriter.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreWriter.java new file mode 100644 index 00000000000..b156d731778 --- /dev/null +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreWriter.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.tablestore.sink; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; +import org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreOptions; +import org.apache.seatunnel.connectors.seatunnel.tablestore.serialize.DefaultSeaTunnelRowSerializer; +import org.apache.seatunnel.connectors.seatunnel.tablestore.serialize.SeaTunnelRowSerializer; + +import java.io.IOException; + +public class TablestoreWriter extends AbstractSinkWriter { + + private final TablestoreSinkClient tablestoreSinkClient; + private final SeaTunnelRowSerializer serializer; + + public TablestoreWriter(TablestoreOptions tablestoreOptions, SeaTunnelRowType seaTunnelRowType) { + tablestoreSinkClient = new TablestoreSinkClient(tablestoreOptions, seaTunnelRowType); + serializer = new DefaultSeaTunnelRowSerializer(seaTunnelRowType, tablestoreOptions); + } + + @Override + public void write(SeaTunnelRow element) throws IOException { + tablestoreSinkClient.write(serializer.serialize(element)); + } + + @Override + public void close() throws IOException { + tablestoreSinkClient.close(); + } +} diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml index 8ca99e023cf..7a3a6587efe 100644 --- a/seatunnel-connectors-v2/pom.xml +++ b/seatunnel-connectors-v2/pom.xml @@ -57,6 +57,7 @@ connector-iceberg connector-influxdb connector-amazondynamodb + connector-tablestore From 72f75d49a7de76c8e32012f2e81350ae642767db Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Wed, 9 Nov 2022 18:33:47 +0800 Subject: [PATCH 12/29] fix type error --- .../DefaultSeaTunnelRowSerializer.java | 29 +++++++++---------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/DefaultSeaTunnelRowSerializer.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/DefaultSeaTunnelRowSerializer.java index 5e649dcb756..98f9d99d212 100644 --- a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/DefaultSeaTunnelRowSerializer.java +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/DefaultSeaTunnelRowSerializer.java @@ -31,6 +31,8 @@ import com.alicloud.openservices.tablestore.model.PrimaryKeyValue; import com.alicloud.openservices.tablestore.model.RowPutChange; +import java.util.Arrays; + public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer { private final SeaTunnelRowType seaTunnelRowType; @@ -45,22 +47,19 @@ public DefaultSeaTunnelRowSerializer(SeaTunnelRowType seaTunnelRowType, Tablesto public RowPutChange serialize(SeaTunnelRow seaTunnelRow) { PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder(); - tablestoreOptions.getPrimaryKeys().forEach(primaryKeyName -> { - Object field = seaTunnelRow.getField(seaTunnelRowType.indexOf(primaryKeyName)); - primaryKeyBuilder.addPrimaryKeyColumn( - this.convertPrimaryKeyColumn(primaryKeyName, field, - this.convertPrimaryKeyType(seaTunnelRowType.getFieldType(seaTunnelRowType.indexOf(primaryKeyName))))); - - }); - RowPutChange rowPutChange = new RowPutChange(tablestoreOptions.getTable(), primaryKeyBuilder.build()); - for (int index = 0; index < seaTunnelRowType.getFieldNames().length; index++) { - Object fieldValue = seaTunnelRow.getField(index); - String fieldName = seaTunnelRowType.getFieldName(index); - - rowPutChange.addColumn(this.convertColumn(fieldName, fieldValue, - this.convertColumnType(seaTunnelRowType.getFieldType(index)))); - } + Arrays.stream(seaTunnelRowType.getFieldNames()).forEach(fieldName -> { + Object field = seaTunnelRow.getField(seaTunnelRowType.indexOf(fieldName)); + int index = seaTunnelRowType.indexOf(fieldName); + if (tablestoreOptions.getPrimaryKeys().contains(fieldName)) { + primaryKeyBuilder.addPrimaryKeyColumn( + this.convertPrimaryKeyColumn(fieldName, field, + this.convertPrimaryKeyType(seaTunnelRowType.getFieldType(index)))); + } else { + rowPutChange.addColumn(this.convertColumn(fieldName, field, + this.convertColumnType(seaTunnelRowType.getFieldType(index)))); + } + }); return rowPutChange; } From 51a56a7907e4385553ccf580a539074722215129 Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Wed, 9 Nov 2022 18:37:10 +0800 Subject: [PATCH 13/29] add tablestore sink --- plugin-mapping.properties | 3 ++- seatunnel-dist/pom.xml | 7 ++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/plugin-mapping.properties b/plugin-mapping.properties index e7fe7c79ff8..ff54c1195e7 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -143,4 +143,5 @@ seatunnel.source.Cassandra = connector-cassandra seatunnel.sink.Cassandra = connector-cassandra seatunnel.sink.StarRocks = connector-starrocks seatunnel.source.MyHours = connector-http-myhours -seatunnel.sink.InfluxDB = connector-influxdb \ No newline at end of file +seatunnel.sink.InfluxDB = connector-influxdb +seatunnel.sink.Tablestore = connector-tablestore diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml index 3cf94f3b1b7..036d9620e57 100644 --- a/seatunnel-dist/pom.xml +++ b/seatunnel-dist/pom.xml @@ -315,6 +315,11 @@ ${project.version} provided + + org.apache.seatunnel + connector-tablestore + ${project.version} + provided @@ -671,7 +676,7 @@ ${project.version} provided - + org.apache.seatunnel From b736f234e9f093ec56b7f900516c5eb3f2b39673 Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Wed, 9 Nov 2022 20:15:13 +0800 Subject: [PATCH 14/29] fix tablestore sink bug --- docs/en/connector-v2/sink/Tablestore.md | 74 +++++++++++++++++++ .../DefaultSeaTunnelRowSerializer.java | 9 ++- 2 files changed, 81 insertions(+), 2 deletions(-) create mode 100644 docs/en/connector-v2/sink/Tablestore.md diff --git a/docs/en/connector-v2/sink/Tablestore.md b/docs/en/connector-v2/sink/Tablestore.md new file mode 100644 index 00000000000..c78521d263f --- /dev/null +++ b/docs/en/connector-v2/sink/Tablestore.md @@ -0,0 +1,74 @@ +# Tablestore + +> Tablestore sink connector + +## Description + +Write data to `Tablestore` + +## Key features + +- [ ] [exactly-once](../../concept/connector-v2-features.md) +- [ ] [schema projection](../../concept/connector-v2-features.md) + +## Options + +| name | type | required | default value | +|----------------- | ------ |----------| ------------- | +| end_point | string | yes | - | +| instance_name | string | yes | - | +| access_key_id | string | yes | - | +| access_key_secret| string | yes | - | +| table | string | yes | - | +| primary_keys | array | yes | - | +| batch_size | string | no | 25 | +| batch_interval_ms| string | no | 1000 | +| common-options | config | no | - | + +### end_point [string] + +endPoint to write to Tablestore. + +### instanceName [string] + +The instanceName of Tablestore. + +### access_key_id [string] + +The access id of Tablestore. + +### access_key_secret [string] + +The access secret of Tablestore. + +### table [string] + +The table of Tablestore. + +### primaryKeys [array] + +The primaryKeys of Tablestore. + +### common options + +Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details. + +## Example + +```bash +Tablestore { + end_point = "xxxx" + instance_name = "xxxx" + access_key_id = "xxxx" + access_key_secret = "xxxx" + table = "sink" + primary_keys = ["pk_1","pk_2","pk_3","pk_4"] + } +``` + +## Changelog + +### next version + +- Add Tablestore Sink Connector + diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/DefaultSeaTunnelRowSerializer.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/DefaultSeaTunnelRowSerializer.java index 98f9d99d212..6e59236fff7 100644 --- a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/DefaultSeaTunnelRowSerializer.java +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/DefaultSeaTunnelRowSerializer.java @@ -31,7 +31,9 @@ import com.alicloud.openservices.tablestore.model.PrimaryKeyValue; import com.alicloud.openservices.tablestore.model.RowPutChange; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer { @@ -47,7 +49,7 @@ public DefaultSeaTunnelRowSerializer(SeaTunnelRowType seaTunnelRowType, Tablesto public RowPutChange serialize(SeaTunnelRow seaTunnelRow) { PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder(); - RowPutChange rowPutChange = new RowPutChange(tablestoreOptions.getTable(), primaryKeyBuilder.build()); + List columns = new ArrayList<>(seaTunnelRow.getFields().length - tablestoreOptions.getPrimaryKeys().size()); Arrays.stream(seaTunnelRowType.getFieldNames()).forEach(fieldName -> { Object field = seaTunnelRow.getField(seaTunnelRowType.indexOf(fieldName)); int index = seaTunnelRowType.indexOf(fieldName); @@ -56,10 +58,13 @@ public RowPutChange serialize(SeaTunnelRow seaTunnelRow) { this.convertPrimaryKeyColumn(fieldName, field, this.convertPrimaryKeyType(seaTunnelRowType.getFieldType(index)))); } else { - rowPutChange.addColumn(this.convertColumn(fieldName, field, + columns.add(this.convertColumn(fieldName, field, this.convertColumnType(seaTunnelRowType.getFieldType(index)))); } }); + RowPutChange rowPutChange = new RowPutChange(tablestoreOptions.getTable(), primaryKeyBuilder.build()); + columns.forEach(rowPutChange::addColumn); + return rowPutChange; } From 448a619e6ae661ff16ed319a7b8d9cf5a43024bf Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Wed, 9 Nov 2022 21:51:29 +0800 Subject: [PATCH 15/29] fix tablestore sink bug --- .../serialize/DefaultSeaTunnelRowSerializer.java | 3 +++ .../tablestore/sink/TablestoreSinkClient.java | 13 ++++++++++++- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/DefaultSeaTunnelRowSerializer.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/DefaultSeaTunnelRowSerializer.java index 6e59236fff7..0f3e7f3e2b1 100644 --- a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/DefaultSeaTunnelRowSerializer.java +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/DefaultSeaTunnelRowSerializer.java @@ -25,10 +25,12 @@ import com.alicloud.openservices.tablestore.model.Column; import com.alicloud.openservices.tablestore.model.ColumnType; import com.alicloud.openservices.tablestore.model.ColumnValue; +import com.alicloud.openservices.tablestore.model.Condition; import com.alicloud.openservices.tablestore.model.PrimaryKeyBuilder; import com.alicloud.openservices.tablestore.model.PrimaryKeyColumn; import com.alicloud.openservices.tablestore.model.PrimaryKeyType; import com.alicloud.openservices.tablestore.model.PrimaryKeyValue; +import com.alicloud.openservices.tablestore.model.RowExistenceExpectation; import com.alicloud.openservices.tablestore.model.RowPutChange; import java.util.ArrayList; @@ -63,6 +65,7 @@ public RowPutChange serialize(SeaTunnelRow seaTunnelRow) { } }); RowPutChange rowPutChange = new RowPutChange(tablestoreOptions.getTable(), primaryKeyBuilder.build()); + rowPutChange.setCondition(new Condition(RowExistenceExpectation.IGNORE)); columns.forEach(rowPutChange::addColumn); return rowPutChange; diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkClient.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkClient.java index 13977d92844..fb5f20619c0 100644 --- a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkClient.java +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkClient.java @@ -18,12 +18,15 @@ package org.apache.seatunnel.connectors.seatunnel.tablestore.sink; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.utils.SeaTunnelException; import org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreOptions; import com.alicloud.openservices.tablestore.SyncClient; import com.alicloud.openservices.tablestore.model.BatchWriteRowRequest; +import com.alicloud.openservices.tablestore.model.BatchWriteRowResponse; import com.alicloud.openservices.tablestore.model.RowPutChange; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.util.ArrayList; @@ -33,6 +36,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +@Slf4j public class TablestoreSinkClient { private final TablestoreOptions tablestoreOptions; private ScheduledExecutorService scheduler; @@ -102,7 +106,14 @@ synchronized void flush() throws IOException { } BatchWriteRowRequest batchWriteRowRequest = new BatchWriteRowRequest(); batchList.forEach(batchWriteRowRequest::addRowChange); - syncClient.batchWriteRow(batchWriteRowRequest); + BatchWriteRowResponse response = syncClient.batchWriteRow(batchWriteRowRequest); + + if (!response.isAllSucceed()) { + for (BatchWriteRowResponse.RowResult rowResult : response.getFailedRows()) { + throw new SeaTunnelException("Code: " + rowResult.getError().getCode() + + "Message:" + rowResult.getError().getMessage()); + } + } batchList.clear(); } From 40d20183c3c6a883cef73cc8ca191aacac680421 Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Fri, 11 Nov 2022 22:40:11 +0800 Subject: [PATCH 16/29] fix cr error --- docs/en/connector-v2/sink/Jdbc.md | 24 +++++------ docs/en/connector-v2/sink/Tablestore.md | 2 +- .../tablestore/config/TablestoreConfig.java | 43 +++++++++++++++---- .../tablestore/config/TablestoreOptions.java | 4 +- .../tablestore/sink/TablestoreSink.java | 2 +- .../tablestore/sink/TablestoreSinkClient.java | 4 +- .../sink/TablestoreSinkFactory.java | 41 ++++++++++++++++++ seatunnel-dist/pom.xml | 3 +- 8 files changed, 95 insertions(+), 28 deletions(-) create mode 100644 seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkFactory.java diff --git a/docs/en/connector-v2/sink/Jdbc.md b/docs/en/connector-v2/sink/Jdbc.md index 627f3e5b73d..1414faeaa9a 100644 --- a/docs/en/connector-v2/sink/Jdbc.md +++ b/docs/en/connector-v2/sink/Jdbc.md @@ -108,17 +108,17 @@ In the case of is_exactly_once = "true", Xa transactions are used. This requires there are some reference value for params above. -| datasource | driver | url | xa_data_source_class_name | maven | -|------------|-------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------|----------------------------------------------------|-------------------------------------------------------------------------------------------------------------| -| MySQL | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | com.mysql.cj.jdbc.MysqlXADataSource | https://mvnrepository.com/artifact/mysql/mysql-connector-java | -| PostgreSQL | org.postgresql.Driver | jdbc:postgresql://localhost:5432/postgres | org.postgresql.xa.PGXADataSource | https://mvnrepository.com/artifact/org.postgresql/postgresql | -| DM | dm.jdbc.driver.DmDriver | jdbc:dm://localhost:5236 | dm.jdbc.driver.DmdbXADataSource | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18 | -| Phoenix | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | / | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client | -| SQL Server | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:microsoft:sqlserver://localhost:1433 | com.microsoft.sqlserver.jdbc.SQLServerXADataSource | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc | -| Oracle | oracle.jdbc.OracleDriver | jdbc:oracle:thin:@localhost:1521/xepdb1 | oracle.jdbc.xa.OracleXADataSource | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 | -| GBase8a | com.gbase.jdbc.Driver | jdbc:gbase://e2e_gbase8aDb:5258/test | / | https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar | -| StarRocks | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | / | https://mvnrepository.com/artifact/mysql/mysql-connector-java | -| db2 | com.ibm.db2.jcc.DB2Driver | jdbc:db2://localhost:50000/testdb | com.ibm.db2.jcc.DB2XADataSource | https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4 | +| datasource | driver | url | xa_data_source_class_name | maven | +|------------|----------------------------------------------|--------------------------------------------------------------------|----------------------------------------------------|-------------------------------------------------------------------------------------------------------------| +| MySQL | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | com.mysql.cj.jdbc.MysqlXADataSource | https://mvnrepository.com/artifact/mysql/mysql-connector-java | +| PostgreSQL | org.postgresql.Driver | jdbc:postgresql://localhost:5432/postgres | org.postgresql.xa.PGXADataSource | https://mvnrepository.com/artifact/org.postgresql/postgresql | +| DM | dm.jdbc.driver.DmDriver | jdbc:dm://localhost:5236 | dm.jdbc.driver.DmdbXADataSource | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18 | +| Phoenix | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | / | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client | +| SQL Server | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:microsoft:sqlserver://localhost:1433 | com.microsoft.sqlserver.jdbc.SQLServerXADataSource | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc | +| Oracle | oracle.jdbc.OracleDriver | jdbc:oracle:thin:@localhost:1521/xepdb1 | oracle.jdbc.xa.OracleXADataSource | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 | +| GBase8a | com.gbase.jdbc.Driver | jdbc:gbase://e2e_gbase8aDb:5258/test | / | https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar | +| StarRocks | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | / | https://mvnrepository.com/artifact/mysql/mysql-connector-java | +| db2 | com.ibm.db2.jcc.DB2Driver | jdbc:db2://localhost:50000/testdb | com.ibm.db2.jcc.DB2XADataSource | https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4 | ## Example @@ -167,4 +167,4 @@ jdbc { - [Feature] Support SQL Server JDBC Source ([2646](https://github.com/apache/incubator-seatunnel/pull/2646)) - [Feature] Support Oracle JDBC Source ([2550](https://github.com/apache/incubator-seatunnel/pull/2550)) - [Feature] Support StarRocks JDBC Source ([3060](https://github.com/apache/incubator-seatunnel/pull/3060)) -- [Feature] Support DB2 JDBC Sink ([2410](https://github.com/apache/incubator-seatunnel/pull/2410)) +- [Feature] Support DB2 JDBC Sink ([2410](https://github.com/apache/incubator-seatunnel/pull/2410)) \ No newline at end of file diff --git a/docs/en/connector-v2/sink/Tablestore.md b/docs/en/connector-v2/sink/Tablestore.md index c78521d263f..15ca34eda20 100644 --- a/docs/en/connector-v2/sink/Tablestore.md +++ b/docs/en/connector-v2/sink/Tablestore.md @@ -49,7 +49,7 @@ The table of Tablestore. The primaryKeys of Tablestore. -### common options +### common options [ config ] Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details. diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java index 0fb02103ba7..4206abb9d2d 100644 --- a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java @@ -17,15 +17,42 @@ package org.apache.seatunnel.connectors.seatunnel.tablestore.config; +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; + import java.io.Serializable; public class TablestoreConfig implements Serializable { - public static final String END_POINT = "end_point"; - public static final String INSTANCE_NAME = "instance_name"; - public static final String ACCESS_KEY_ID = "access_key_id"; - public static final String ACCESS_KEY_SECRET = "access_key_secret"; - public static final String TABLE = "table"; - public static final String BATCH_SIZE = "batch_size"; - public static final String DEFAULT_BATCH_INTERVAL_MS = "batch_interval_ms"; - public static final String PRIMARY_KEYS = "primary_keys"; + public static final Option END_POINT = Options.key("end_point") + .stringType() + .noDefaultValue() + .withDescription(" Tablestore end_point"); + public static final Option INSTANCE_NAME = Options.key("instance_name") + .stringType() + .noDefaultValue() + .withDescription(" Tablestore instance_name"); + public static final Option ACCESS_KEY_ID = Options.key("access_key_id") + .stringType() + .noDefaultValue() + .withDescription(" Tablestore access_key_id"); + public static final Option ACCESS_KEY_SECRET = Options.key("access_key_secret") + .stringType() + .noDefaultValue() + .withDescription(" Tablestore access_key_secret"); + public static final Option TABLE = Options.key("table") + .stringType() + .noDefaultValue() + .withDescription(" Tablestore table"); + public static final Option BATCH_SIZE = Options.key("batch_size") + .stringType() + .noDefaultValue() + .withDescription(" Tablestore batch_size"); + public static final Option DEFAULT_BATCH_INTERVAL_MS = Options.key("batch_interval_ms") + .stringType() + .noDefaultValue() + .withDescription(" Tablestore batch_interval_ms"); + public static final Option PRIMARY_KEYS = Options.key("primary_keys") + .stringType() + .noDefaultValue() + .withDescription(" Tablestore primary_keys"); } diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java index ce53697ac9c..864668985ad 100644 --- a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java @@ -53,6 +53,7 @@ public TablestoreOptions(Config config) { this.accessKeyId = config.getString(TablestoreConfig.ACCESS_KEY_ID); this.accessKeySecret = config.getString(TablestoreConfig.ACCESS_KEY_SECRET); this.table = config.getString(TablestoreConfig.TABLE); + this.primaryKeys = config.getStringList(TablestoreConfig.PRIMARY_KEYS); if (config.hasPath(TablestoreConfig.BATCH_SIZE)) { this.batchSize = config.getInt(TablestoreConfig.BATCH_SIZE); @@ -60,8 +61,5 @@ public TablestoreOptions(Config config) { if (config.hasPath(TablestoreConfig.DEFAULT_BATCH_INTERVAL_MS)) { this.batchIntervalMs = config.getInt(TablestoreConfig.DEFAULT_BATCH_INTERVAL_MS); } - if (config.hasPath(TablestoreConfig.PRIMARY_KEYS)) { - this.primaryKeys = config.getStringList(TablestoreConfig.PRIMARY_KEYS); - } } } diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSink.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSink.java index 9160bcf4ca8..56ffdda5de5 100644 --- a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSink.java +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSink.java @@ -57,7 +57,7 @@ public String getPluginName() { @Override public void prepare(Config pluginConfig) throws PrepareFailException { - CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, END_POINT, TABLE, INSTANCE_NAME, ACCESS_KEY_ID, ACCESS_KEY_SECRET, PRIMARY_KEYS); + CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, END_POINT.key(), TABLE.key(), INSTANCE_NAME.key(), ACCESS_KEY_ID.key(), ACCESS_KEY_SECRET.key(), PRIMARY_KEYS.key()); if (!result.isSuccess()) { throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg()); } diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkClient.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkClient.java index fb5f20619c0..ebeb6aa8b83 100644 --- a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkClient.java +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkClient.java @@ -78,7 +78,7 @@ private void tryInit() throws IOException { initialize = true; } - public synchronized void write(RowPutChange rowPutChange) throws IOException { + public void write(RowPutChange rowPutChange) throws IOException { tryInit(); checkFlushException(); batchList.add(rowPutChange); @@ -88,7 +88,7 @@ public synchronized void write(RowPutChange rowPutChange) throws IOException { } } - public synchronized void close() throws IOException { + public void close() throws IOException { if (scheduledFuture != null) { scheduledFuture.cancel(false); scheduler.shutdown(); diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkFactory.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkFactory.java new file mode 100644 index 00000000000..3d45d7cdd82 --- /dev/null +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkFactory.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.tablestore.sink; + +import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.ACCESS_KEY_ID; +import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.ACCESS_KEY_SECRET; +import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.END_POINT; +import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.INSTANCE_NAME; +import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.PRIMARY_KEYS; +import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.TABLE; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema; + +public class TablestoreSinkFactory implements TableSinkFactory { + @Override + public String factoryIdentifier() { + return "Tablestore"; + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder().required(END_POINT, TABLE, INSTANCE_NAME, ACCESS_KEY_ID, ACCESS_KEY_SECRET, PRIMARY_KEYS, SeaTunnelSchema.SCHEMA).build(); + } +} diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml index 87b156f0875..ba38d3ea374 100644 --- a/seatunnel-dist/pom.xml +++ b/seatunnel-dist/pom.xml @@ -325,7 +325,8 @@ org.apache.seatunnel connector-tablestore ${project.version} - provided + provided + From 5896145b02e9bf38cdf35e2c9990c1fd38bba57b Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Fri, 11 Nov 2022 22:48:38 +0800 Subject: [PATCH 17/29] fix cr error --- .../tablestore/config/TablestoreOptions.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java index 864668985ad..dd2663ea2c4 100644 --- a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java @@ -48,18 +48,18 @@ public class TablestoreOptions implements Serializable { public int batchIntervalMs = DEFAULT_BATCH_INTERVAL_MS; public TablestoreOptions(Config config) { - this.endpoint = config.getString(TablestoreConfig.END_POINT); - this.instanceName = config.getString(TablestoreConfig.INSTANCE_NAME); - this.accessKeyId = config.getString(TablestoreConfig.ACCESS_KEY_ID); - this.accessKeySecret = config.getString(TablestoreConfig.ACCESS_KEY_SECRET); - this.table = config.getString(TablestoreConfig.TABLE); - this.primaryKeys = config.getStringList(TablestoreConfig.PRIMARY_KEYS); + this.endpoint = config.getString(TablestoreConfig.END_POINT.key()); + this.instanceName = config.getString(TablestoreConfig.INSTANCE_NAME.key()); + this.accessKeyId = config.getString(TablestoreConfig.ACCESS_KEY_ID.key()); + this.accessKeySecret = config.getString(TablestoreConfig.ACCESS_KEY_SECRET.key()); + this.table = config.getString(TablestoreConfig.TABLE.key()); + this.primaryKeys = config.getStringList(TablestoreConfig.PRIMARY_KEYS.key()); - if (config.hasPath(TablestoreConfig.BATCH_SIZE)) { - this.batchSize = config.getInt(TablestoreConfig.BATCH_SIZE); + if (config.hasPath(TablestoreConfig.BATCH_SIZE.key())) { + this.batchSize = config.getInt(TablestoreConfig.BATCH_SIZE.key()); } - if (config.hasPath(TablestoreConfig.DEFAULT_BATCH_INTERVAL_MS)) { - this.batchIntervalMs = config.getInt(TablestoreConfig.DEFAULT_BATCH_INTERVAL_MS); + if (config.hasPath(TablestoreConfig.DEFAULT_BATCH_INTERVAL_MS.key())) { + this.batchIntervalMs = config.getInt(TablestoreConfig.DEFAULT_BATCH_INTERVAL_MS.key()); } } } From d5a839a45d6818b91a26de82bdaa3ad6aeec107a Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Sat, 12 Nov 2022 09:10:32 +0800 Subject: [PATCH 18/29] fix cr error --- .../seatunnel/tablestore/sink/TablestoreSinkFactory.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkFactory.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkFactory.java index 3d45d7cdd82..e75f2c3c6e6 100644 --- a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkFactory.java +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkFactory.java @@ -25,9 +25,13 @@ import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.TABLE; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSinkFactory; import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema; +import com.google.auto.service.AutoService; + +@AutoService(Factory.class) public class TablestoreSinkFactory implements TableSinkFactory { @Override public String factoryIdentifier() { From 7e84bd7a52e17c458a81318b60146de810b95896 Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Sat, 12 Nov 2022 21:36:45 +0800 Subject: [PATCH 19/29] add optional params --- .../seatunnel/tablestore/sink/TablestoreSinkFactory.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkFactory.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkFactory.java index e75f2c3c6e6..f84005b6ce0 100644 --- a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkFactory.java +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkFactory.java @@ -19,6 +19,8 @@ import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.ACCESS_KEY_ID; import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.ACCESS_KEY_SECRET; +import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.BATCH_SIZE; +import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.DEFAULT_BATCH_INTERVAL_MS; import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.END_POINT; import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.INSTANCE_NAME; import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.PRIMARY_KEYS; @@ -40,6 +42,9 @@ public String factoryIdentifier() { @Override public OptionRule optionRule() { - return OptionRule.builder().required(END_POINT, TABLE, INSTANCE_NAME, ACCESS_KEY_ID, ACCESS_KEY_SECRET, PRIMARY_KEYS, SeaTunnelSchema.SCHEMA).build(); + return OptionRule.builder() + .required(END_POINT, TABLE, INSTANCE_NAME, ACCESS_KEY_ID, ACCESS_KEY_SECRET, PRIMARY_KEYS, SeaTunnelSchema.SCHEMA) + .optional(DEFAULT_BATCH_INTERVAL_MS, BATCH_SIZE) + .build(); } } From 5df8159bd2902ec2de1bbeb266a7cfb77c517677 Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Mon, 14 Nov 2022 15:09:57 +0800 Subject: [PATCH 20/29] fix cr error --- .../seatunnel/jdbc/internal/dialect/JdbcDialect.java | 7 +++++++ .../internal/dialect/tablestore/TablestoreDialect.java | 8 ++++++++ .../connectors/seatunnel/jdbc/source/JdbcSource.java | 4 +--- .../seatunnel/tablestore/config/TablestoreConfig.java | 4 ++-- 4 files changed, 18 insertions(+), 5 deletions(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java index 12fc2cd8c19..150e8e9bd5c 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java @@ -17,12 +17,14 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect; +import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceOptions; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter; import java.io.Serializable; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.sql.ResultSetMetaData; import java.sql.SQLException; /** @@ -67,4 +69,9 @@ default PreparedStatement creatPreparedStatement(Connection connection, String q return statement; } + default ResultSetMetaData getResultSetMetaData(Connection conn, JdbcSourceOptions jdbcSourceOptions) throws SQLException { + PreparedStatement ps = conn.prepareStatement(jdbcSourceOptions.getJdbcConnectionOptions().getQuery()); + return ps.getMetaData(); + } + } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialect.java index bf1645a6f1b..7dfa1888e6a 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialect.java @@ -17,12 +17,14 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.tablestore; +import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceOptions; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; import java.sql.Connection; import java.sql.PreparedStatement; +import java.sql.ResultSetMetaData; import java.sql.SQLException; public class TablestoreDialect implements JdbcDialect { @@ -47,4 +49,10 @@ public PreparedStatement creatPreparedStatement(Connection connection, String qu statement.setFetchSize(fetchSize); return statement; } + + @Override + public ResultSetMetaData getResultSetMetaData(Connection conn, JdbcSourceOptions jdbcSourceOptions) throws SQLException { + PreparedStatement ps = conn.prepareStatement(jdbcSourceOptions.getJdbcConnectionOptions().getQuery()); + return ps.executeQuery().getMetaData(); + } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java index 79d4671364c..918ea1052d1 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java @@ -44,7 +44,6 @@ import org.slf4j.LoggerFactory; import java.sql.Connection; -import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; @@ -129,8 +128,7 @@ private SeaTunnelRowType initTableField(Connection conn) { ArrayList> seaTunnelDataTypes = new ArrayList<>(); ArrayList fieldNames = new ArrayList<>(); try { - PreparedStatement ps = conn.prepareStatement(jdbcSourceOptions.getJdbcConnectionOptions().getQuery()); - ResultSetMetaData resultSetMetaData = ps.executeQuery().getMetaData(); + ResultSetMetaData resultSetMetaData = jdbcDialect.getResultSetMetaData(conn, jdbcSourceOptions); for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) { fieldNames.add(resultSetMetaData.getColumnName(i)); seaTunnelDataTypes.add(jdbcDialectTypeMapper.mapping(resultSetMetaData, i)); diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java index 4206abb9d2d..7c23f1ace94 100644 --- a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java @@ -49,10 +49,10 @@ public class TablestoreConfig implements Serializable { .withDescription(" Tablestore batch_size"); public static final Option DEFAULT_BATCH_INTERVAL_MS = Options.key("batch_interval_ms") .stringType() - .noDefaultValue() + .defaultValue("25") .withDescription(" Tablestore batch_interval_ms"); public static final Option PRIMARY_KEYS = Options.key("primary_keys") .stringType() - .noDefaultValue() + .defaultValue("1000") .withDescription(" Tablestore primary_keys"); } From de199514370f5e7c94876db5d0afade9c2d6b813 Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Mon, 14 Nov 2022 16:34:24 +0800 Subject: [PATCH 21/29] Update seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java Co-authored-by: Eric --- .../seatunnel/tablestore/config/TablestoreConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java index 7c23f1ace94..053195695ec 100644 --- a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java @@ -49,7 +49,7 @@ public class TablestoreConfig implements Serializable { .withDescription(" Tablestore batch_size"); public static final Option DEFAULT_BATCH_INTERVAL_MS = Options.key("batch_interval_ms") .stringType() - .defaultValue("25") + .defaultValue("1000") .withDescription(" Tablestore batch_interval_ms"); public static final Option PRIMARY_KEYS = Options.key("primary_keys") .stringType() From 1be80a9817acc2340cf70681924a7a2f94b20416 Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Mon, 14 Nov 2022 16:34:36 +0800 Subject: [PATCH 22/29] Update seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java Co-authored-by: Eric --- .../seatunnel/tablestore/config/TablestoreConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java index 053195695ec..ccdde75eb62 100644 --- a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java @@ -45,7 +45,7 @@ public class TablestoreConfig implements Serializable { .withDescription(" Tablestore table"); public static final Option BATCH_SIZE = Options.key("batch_size") .stringType() - .noDefaultValue() + .defaultValue("25") .withDescription(" Tablestore batch_size"); public static final Option DEFAULT_BATCH_INTERVAL_MS = Options.key("batch_interval_ms") .stringType() From 3e5c3eba289a09754fc4625a462edc61697fdff1 Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Mon, 14 Nov 2022 16:34:48 +0800 Subject: [PATCH 23/29] Update seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java Co-authored-by: Eric --- .../seatunnel/tablestore/config/TablestoreConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java index ccdde75eb62..90ddaeaddbf 100644 --- a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java @@ -47,7 +47,7 @@ public class TablestoreConfig implements Serializable { .stringType() .defaultValue("25") .withDescription(" Tablestore batch_size"); - public static final Option DEFAULT_BATCH_INTERVAL_MS = Options.key("batch_interval_ms") + public static final Option BATCH_INTERVAL_MS = Options.key("batch_interval_ms") .stringType() .defaultValue("1000") .withDescription(" Tablestore batch_interval_ms"); From f2e4c6b6a39954d0ba1efba496b513ff3f22117c Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Mon, 14 Nov 2022 16:35:00 +0800 Subject: [PATCH 24/29] Update seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java Co-authored-by: Eric --- .../seatunnel/tablestore/config/TablestoreOptions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java index dd2663ea2c4..f024033140e 100644 --- a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java @@ -29,7 +29,7 @@ @AllArgsConstructor public class TablestoreOptions implements Serializable { - private static final int DEFAULT_BATCH_SIZE = 25; + private static final int DEFAULT_BATCH_INTERVAL_MS = 1000; private String endpoint; From c2883a5d61c0858e9378f93331a88369c062ac1a Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Mon, 14 Nov 2022 16:35:13 +0800 Subject: [PATCH 25/29] Update seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java Co-authored-by: Eric --- .../seatunnel/tablestore/config/TablestoreConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java index 90ddaeaddbf..4a79197c6c9 100644 --- a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java @@ -53,6 +53,6 @@ public class TablestoreConfig implements Serializable { .withDescription(" Tablestore batch_interval_ms"); public static final Option PRIMARY_KEYS = Options.key("primary_keys") .stringType() - .defaultValue("1000") + .noDefaultValue() .withDescription(" Tablestore primary_keys"); } From 557fed0dba8b1df25fe93ddc72647b7871ac12f4 Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Mon, 14 Nov 2022 16:35:22 +0800 Subject: [PATCH 26/29] Update seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java Co-authored-by: Eric --- .../seatunnel/tablestore/config/TablestoreOptions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java index f024033140e..f6be9814ecf 100644 --- a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java @@ -30,7 +30,7 @@ public class TablestoreOptions implements Serializable { - private static final int DEFAULT_BATCH_INTERVAL_MS = 1000; + private String endpoint; From 6ff5e159995c452cc298a0435379ecba6359f85b Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Mon, 14 Nov 2022 16:35:29 +0800 Subject: [PATCH 27/29] Update seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java Co-authored-by: Eric --- .../seatunnel/tablestore/config/TablestoreOptions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java index f6be9814ecf..1d6f6ce5c39 100644 --- a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java @@ -44,7 +44,7 @@ public class TablestoreOptions implements Serializable { private List primaryKeys; - public int batchSize = DEFAULT_BATCH_SIZE; + public int batchSize = Integer.valueOf(BATCH_SIZE.defaultValue()); public int batchIntervalMs = DEFAULT_BATCH_INTERVAL_MS; public TablestoreOptions(Config config) { From d76fa8ab0d5faa5fc450595b679438e813afc60e Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Mon, 14 Nov 2022 16:35:38 +0800 Subject: [PATCH 28/29] Update seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java Co-authored-by: Eric --- .../seatunnel/tablestore/config/TablestoreOptions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java index 1d6f6ce5c39..726c5c80d65 100644 --- a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java @@ -45,7 +45,7 @@ public class TablestoreOptions implements Serializable { private List primaryKeys; public int batchSize = Integer.valueOf(BATCH_SIZE.defaultValue()); - public int batchIntervalMs = DEFAULT_BATCH_INTERVAL_MS; + public int batchIntervalMs = Integer.valueOf(BATCH_INTERVAL_MS.defaultValue()); public TablestoreOptions(Config config) { this.endpoint = config.getString(TablestoreConfig.END_POINT.key()); From ec0786c2d9249960790951ea148296fa74fa9fe2 Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Mon, 14 Nov 2022 16:38:25 +0800 Subject: [PATCH 29/29] fix cr error --- .../tablestore/config/TablestoreOptions.java | 17 +++++++++-------- .../tablestore/sink/TablestoreSinkFactory.java | 4 ++-- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java index 726c5c80d65..7faa8a688fb 100644 --- a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java @@ -17,6 +17,9 @@ package org.apache.seatunnel.connectors.seatunnel.tablestore.config; +import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.BATCH_INTERVAL_MS; +import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.BATCH_SIZE; + import org.apache.seatunnel.shade.com.typesafe.config.Config; import lombok.AllArgsConstructor; @@ -29,8 +32,6 @@ @AllArgsConstructor public class TablestoreOptions implements Serializable { - - private String endpoint; @@ -44,8 +45,8 @@ public class TablestoreOptions implements Serializable { private List primaryKeys; - public int batchSize = Integer.valueOf(BATCH_SIZE.defaultValue()); - public int batchIntervalMs = Integer.valueOf(BATCH_INTERVAL_MS.defaultValue()); + public int batchSize = Integer.parseInt(BATCH_SIZE.defaultValue()); + public int batchIntervalMs = Integer.parseInt(BATCH_INTERVAL_MS.defaultValue()); public TablestoreOptions(Config config) { this.endpoint = config.getString(TablestoreConfig.END_POINT.key()); @@ -55,11 +56,11 @@ public TablestoreOptions(Config config) { this.table = config.getString(TablestoreConfig.TABLE.key()); this.primaryKeys = config.getStringList(TablestoreConfig.PRIMARY_KEYS.key()); - if (config.hasPath(TablestoreConfig.BATCH_SIZE.key())) { - this.batchSize = config.getInt(TablestoreConfig.BATCH_SIZE.key()); + if (config.hasPath(BATCH_SIZE.key())) { + this.batchSize = config.getInt(BATCH_SIZE.key()); } - if (config.hasPath(TablestoreConfig.DEFAULT_BATCH_INTERVAL_MS.key())) { - this.batchIntervalMs = config.getInt(TablestoreConfig.DEFAULT_BATCH_INTERVAL_MS.key()); + if (config.hasPath(TablestoreConfig.BATCH_INTERVAL_MS.key())) { + this.batchIntervalMs = config.getInt(TablestoreConfig.BATCH_INTERVAL_MS.key()); } } } diff --git a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkFactory.java b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkFactory.java index f84005b6ce0..2310292a795 100644 --- a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkFactory.java +++ b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkFactory.java @@ -19,8 +19,8 @@ import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.ACCESS_KEY_ID; import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.ACCESS_KEY_SECRET; +import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.BATCH_INTERVAL_MS; import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.BATCH_SIZE; -import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.DEFAULT_BATCH_INTERVAL_MS; import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.END_POINT; import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.INSTANCE_NAME; import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.PRIMARY_KEYS; @@ -44,7 +44,7 @@ public String factoryIdentifier() { public OptionRule optionRule() { return OptionRule.builder() .required(END_POINT, TABLE, INSTANCE_NAME, ACCESS_KEY_ID, ACCESS_KEY_SECRET, PRIMARY_KEYS, SeaTunnelSchema.SCHEMA) - .optional(DEFAULT_BATCH_INTERVAL_MS, BATCH_SIZE) + .optional(BATCH_INTERVAL_MS, BATCH_SIZE) .build(); } }