diff --git a/chunjun-connectors/chunjun-connector-greenplum/src/main/java/com/dtstack/chunjun/connector/greenplum/dialect/GreenplumDialect.java b/chunjun-connectors/chunjun-connector-greenplum/src/main/java/com/dtstack/chunjun/connector/greenplum/dialect/GreenplumDialect.java index 942af88c1a..62045c9e0b 100644 --- a/chunjun-connectors/chunjun-connector-greenplum/src/main/java/com/dtstack/chunjun/connector/greenplum/dialect/GreenplumDialect.java +++ b/chunjun-connectors/chunjun-connector-greenplum/src/main/java/com/dtstack/chunjun/connector/greenplum/dialect/GreenplumDialect.java @@ -33,7 +33,8 @@ public class GreenplumDialect extends PostgresqlDialect { private static final String DIALECT_NAME = "Greenplum"; private static final String DRIVER = "com.pivotal.jdbc.GreenplumDriver"; - private static final String URL_START = "jdbc:pivotal:greenplum:"; + public static final String URL_START = "jdbc:pivotal:greenplum:"; + public static final String DATABASE_NAME = ";DatabaseName="; @Override public String dialectName() { diff --git a/chunjun-connectors/chunjun-connector-greenplum/src/main/java/com/dtstack/chunjun/connector/greenplum/sink/GreenplumOutputFormat.java b/chunjun-connectors/chunjun-connector-greenplum/src/main/java/com/dtstack/chunjun/connector/greenplum/sink/GreenplumOutputFormat.java new file mode 100644 index 0000000000..9c19d5cdfc --- /dev/null +++ b/chunjun-connectors/chunjun-connector-greenplum/src/main/java/com/dtstack/chunjun/connector/greenplum/sink/GreenplumOutputFormat.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.greenplum.sink; + +import com.dtstack.chunjun.connector.jdbc.converter.JdbcColumnConverter; +import com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormat; +import com.dtstack.chunjun.connector.postgresql.converter.PostgresqlColumnConverter; +import com.dtstack.chunjun.connector.postgresql.dialect.PostgresqlDialect; +import com.dtstack.chunjun.constants.ConstantValue; +import com.dtstack.chunjun.element.ColumnRowData; +import com.dtstack.chunjun.enums.EWriteMode; +import com.dtstack.chunjun.throwable.NoRestartException; +import com.dtstack.chunjun.throwable.WriteRecordException; + +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.table.data.RowData; + +import org.apache.commons.lang3.math.NumberUtils; +import org.postgresql.copy.CopyManager; +import org.postgresql.core.BaseConnection; + +import java.io.ByteArrayInputStream; +import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; +import java.sql.Connection; +import java.sql.SQLException; + +/** + * @program: flinkx + * @author: jier + */ +public class GreenplumOutputFormat extends JdbcOutputFormat { + + // pg 字符串里含有\u0000 会报错 ERROR: invalid byte sequence for encoding "UTF8": 0x00 + public static final String SPACE = "\u0000"; + + private static final String LINE_DELIMITER = "\n"; + private CopyManager copyManager; + private boolean disableCopyMode = false; + private String copySql = ""; + public static final String INSERT_SQL_MODE_TYPE = "copy"; + private static final String DEFAULT_FIELD_DELIMITER = "\001"; + + private static final String DEFAULT_NULL_VALUE = "\002"; + + /** 数据源类型信息 * */ + private final String dbType = DbType.POSTGRESQL.name(); + + @Override + protected void openInternal(int taskNumber, int numTasks) { + super.openInternal(taskNumber, numTasks); + try { + // check is use copy mode for insert + disableCopyMode = + jdbcConf.getInsertSqlMode() != null + && !INSERT_SQL_MODE_TYPE.equalsIgnoreCase(jdbcConf.getInsertSqlMode()); + if (EWriteMode.INSERT.name().equalsIgnoreCase(jdbcConf.getMode()) && !disableCopyMode) { + LOG.info("will use copy mode"); + copyManager = new CopyManager((BaseConnection) dbConn); + + PostgresqlDialect pgDialect = (PostgresqlDialect) jdbcDialect; + copySql = + pgDialect.getCopyStatement( + jdbcConf.getSchema(), + jdbcConf.getTable(), + columnNameList.toArray(new String[0]), + DEFAULT_FIELD_DELIMITER, + DEFAULT_NULL_VALUE); + + LOG.info("write sql:{}", copySql); + } + checkUpsert(); + if (rowConverter instanceof PostgresqlColumnConverter + && dbConn instanceof BaseConnection) { + ((PostgresqlColumnConverter) rowConverter).setConnection((BaseConnection) dbConn); + } + } catch (SQLException sqe) { + throw new IllegalArgumentException("checkUpsert() failed.", sqe); + } + } + + @Override + protected void writeSingleRecordInternal(RowData row) throws WriteRecordException { + if (disableCopyMode) { + super.writeSingleRecordInternal(row); + } else { + if (rowConverter instanceof JdbcColumnConverter) { + ColumnRowData colRowData = (ColumnRowData) row; + // write with copy + int index = 0; + try { + StringBuilder rowStr = new StringBuilder(); + int lastIndex = row.getArity() - 1; + for (; index < row.getArity(); index++) { + appendColumn(colRowData, index, rowStr, index == lastIndex); + } + String rowVal = copyModeReplace(rowStr.toString()); + try (ByteArrayInputStream bi = + new ByteArrayInputStream(rowVal.getBytes(StandardCharsets.UTF_8))) { + copyManager.copyIn(copySql, bi); + } + } catch (Exception e) { + processWriteException(e, index, row); + } + } else { + throw new NoRestartException("copy mode only support data sync with out table"); + } + } + } + + @Override + protected void writeMultipleRecordsInternal() throws Exception { + if (disableCopyMode) { + super.writeMultipleRecordsInternal(); + } else { + if (rowConverter instanceof JdbcColumnConverter) { + StringBuilder rowsStrBuilder = new StringBuilder(128); + for (RowData row : rows) { + ColumnRowData colRowData = (ColumnRowData) row; + int lastIndex = row.getArity() - 1; + StringBuilder rowStr = new StringBuilder(128); + for (int index = 0; index < row.getArity(); index++) { + appendColumn(colRowData, index, rowStr, index == lastIndex); + } + String tempData = rowStr.toString(); + rowsStrBuilder.append(copyModeReplace(tempData)).append(LINE_DELIMITER); + } + String rowVal = rowsStrBuilder.toString(); + try (ByteArrayInputStream bi = + new ByteArrayInputStream(rowVal.getBytes(StandardCharsets.UTF_8))) { + copyManager.copyIn(copySql, bi); + if (checkpointEnabled && CheckpointingMode.EXACTLY_ONCE == checkpointMode) { + rowsOfCurrentTransaction += rows.size(); + } + } + } else { + throw new NoRestartException("copy mode only support data sync with out table"); + } + } + } + + private void appendColumn( + ColumnRowData colRowData, int pos, StringBuilder rowStr, boolean isLast) { + Object col = colRowData.getField(pos); + if (col == null) { + rowStr.append(DEFAULT_NULL_VALUE); + } else { + rowStr.append(col); + } + if (!isLast) { + rowStr.append(DEFAULT_FIELD_DELIMITER); + } + } + + /** + * \r \n \ 等特殊字符串需要转义 + * + * @return + */ + private String copyModeReplace(String rowStr) { + if (rowStr.contains("\\")) { + rowStr = rowStr.replaceAll("\\\\", "\\\\\\\\"); + } + if (rowStr.contains("\r")) { + rowStr = rowStr.replaceAll("\r", "\\\\r"); + } + + if (rowStr.contains("\n")) { + rowStr = rowStr.replaceAll("\n", "\\\\n"); + } + + // pg 字符串里含有\u0000 会报错 ERROR: invalid byte sequence for encoding "UTF8": 0x00 + if (rowStr.contains(SPACE)) { + rowStr = rowStr.replaceAll(SPACE, ""); + } + return rowStr; + } + + /** 数据源类型 * */ + public enum DbType { + POSTGRESQL, + ADB + } + + /** + * 当mode为update时进行校验 + * + * @return + * @throws SQLException + */ + public void checkUpsert() throws SQLException { + if (EWriteMode.UPDATE.name().equalsIgnoreCase(jdbcConf.getMode())) { + try (Connection connection = getConnection()) { + + // 效验版本 + String databaseProductVersion = + connection.getMetaData().getDatabaseProductVersion(); + LOG.info("source version is {}", databaseProductVersion); + String[] split = databaseProductVersion.split("\\."); + // 10.1.12 + if (split.length > 2) { + databaseProductVersion = split[0] + ConstantValue.POINT_SYMBOL + split[1]; + } + + if (NumberUtils.isNumber(databaseProductVersion)) { + BigDecimal sourceVersion = new BigDecimal(databaseProductVersion); + if (dbType.equalsIgnoreCase(DbType.POSTGRESQL.name())) { + // pg大于等于9.5 + if (sourceVersion.compareTo(new BigDecimal("9.5")) < 0) { + throw new RuntimeException( + "the postgreSql version is [" + + databaseProductVersion + + "] and must greater than or equal to 9.5 when you use update mode and source is " + + DbType.POSTGRESQL.name()); + } + } else if (dbType.equalsIgnoreCase(DbType.ADB.name())) { + // adb大于等于9.4 + if (sourceVersion.compareTo(new BigDecimal("9.4")) < 0) { + throw new RuntimeException( + "the postgreSql version is [" + + databaseProductVersion + + "] and must greater than or equal to 9.4 when you use update mode and source is " + + DbType.ADB.name()); + } + } + } + } + } + } +} diff --git a/chunjun-connectors/chunjun-connector-greenplum/src/main/java/com/dtstack/chunjun/connector/greenplum/sink/GreenplumSinkFactory.java b/chunjun-connectors/chunjun-connector-greenplum/src/main/java/com/dtstack/chunjun/connector/greenplum/sink/GreenplumSinkFactory.java index d0aa9d0ae1..fa38a5d998 100644 --- a/chunjun-connectors/chunjun-connector-greenplum/src/main/java/com/dtstack/chunjun/connector/greenplum/sink/GreenplumSinkFactory.java +++ b/chunjun-connectors/chunjun-connector-greenplum/src/main/java/com/dtstack/chunjun/connector/greenplum/sink/GreenplumSinkFactory.java @@ -20,16 +20,44 @@ import com.dtstack.chunjun.conf.SyncConf; import com.dtstack.chunjun.connector.greenplum.dialect.GreenplumDialect; -import com.dtstack.chunjun.connector.postgresql.sink.PostgresqlSinkFactory; +import com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormatBuilder; +import com.dtstack.chunjun.connector.jdbc.sink.JdbcSinkFactory; +import com.dtstack.chunjun.connector.postgresql.dialect.PostgresqlDialect; + +import org.apache.commons.lang.StringUtils; + +import static com.dtstack.chunjun.connector.greenplum.sink.GreenplumOutputFormat.INSERT_SQL_MODE_TYPE; /** * company www.dtstack.com * * @author jier */ -public class GreenplumSinkFactory extends PostgresqlSinkFactory { +public class GreenplumSinkFactory extends JdbcSinkFactory { public GreenplumSinkFactory(SyncConf syncConf) { - super(syncConf, new GreenplumDialect()); + super(syncConf, null); + if (syncConf.getWriter().getParameter().get("insertSqlMode") != null + && INSERT_SQL_MODE_TYPE.equalsIgnoreCase( + syncConf.getWriter().getParameter().get("insertSqlMode").toString())) { + this.jdbcDialect = new PostgresqlDialect(); + String pgUrl = changeToPostgresqlUrl(this.jdbcConf.getJdbcUrl()); + this.jdbcConf.setJdbcUrl(pgUrl); + } else { + this.jdbcDialect = new GreenplumDialect(); + } + } + + @Override + protected JdbcOutputFormatBuilder getBuilder() { + return new JdbcOutputFormatBuilder(new GreenplumOutputFormat()); + } + + private String changeToPostgresqlUrl(String gpUrl) { + String pgUrl = + StringUtils.replaceOnce( + gpUrl, GreenplumDialect.URL_START, PostgresqlDialect.URL_START); + pgUrl = StringUtils.replaceOnce(pgUrl, GreenplumDialect.DATABASE_NAME, "/"); + return pgUrl; } } diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/conf/JdbcConf.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/conf/JdbcConf.java index dfe13a856d..a3f4ca1b38 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/conf/JdbcConf.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/conf/JdbcConf.java @@ -106,6 +106,10 @@ public class JdbcConf extends ChunJunCommonConf implements Serializable { /** upsert 写数据库时,是否null覆盖原来的值 */ protected boolean allReplace = false; + protected boolean isAutoCommit = false; + + private boolean defineColumnTypeForStatement = false; + public Boolean getInitReporter() { return initReporter; } @@ -419,6 +423,14 @@ public void setAllReplace(boolean allReplace) { this.allReplace = allReplace; } + public boolean isAutoCommit() { + return isAutoCommit; + } + + public boolean isDefineColumnTypeForStatement() { + return defineColumnTypeForStatement; + } + public String getSplitStrategy() { return splitStrategy; } @@ -485,9 +497,13 @@ public String toString() { + increment + ", polling=" + polling + + ", pollingFromMax=" + + pollingFromMax + ", increColumn='" + increColumn + '\'' + + ", isOrderBy=" + + isOrderBy + ", increColumnIndex=" + increColumnIndex + ", increColumnType='" @@ -508,6 +524,8 @@ public String toString() { + restoreColumnIndex + ", useMaxFunc=" + useMaxFunc + + ", initReporter=" + + initReporter + ", mode='" + mode + '\'' @@ -521,6 +539,10 @@ public String toString() { + updateKey + ", allReplace=" + allReplace + + ", isAutoCommit=" + + isAutoCommit + + ", defineColumnTypeForStatement=" + + defineColumnTypeForStatement + '}'; } } diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/JdbcOutputFormat.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/JdbcOutputFormat.java index e2db65adf5..2dd7433c38 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/JdbcOutputFormat.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/JdbcOutputFormat.java @@ -66,7 +66,6 @@ public class JdbcOutputFormat extends BaseRichOutputFormat { protected JdbcDialect jdbcDialect; protected transient Connection dbConn; - protected boolean autoCommit = true; protected transient PreparedStmtProxy stmtProxy; @@ -87,10 +86,7 @@ protected void openInternal(int taskNumber, int numTasks) { try { dbConn = getConnection(); // 默认关闭事务自动提交,手动控制事务 - if (Semantic.EXACTLY_ONCE == semantic) { - autoCommit = false; - dbConn.setAutoCommit(autoCommit); - } + dbConn.setAutoCommit(jdbcConf.isAutoCommit()); if (!EWriteMode.INSERT.name().equalsIgnoreCase(jdbcConf.getMode())) { List updateKey = jdbcConf.getUniqueKey(); if (CollectionUtils.isEmpty(updateKey)) { @@ -135,6 +131,11 @@ protected void writeSingleRecordInternal(RowData row) throws WriteRecordExceptio int index = 0; try { stmtProxy.writeSingleRecordInternal(row); + if (Semantic.EXACTLY_ONCE == semantic) { + rowsOfCurrentTransaction += rows.size(); + } else { + JdbcUtil.commit(dbConn); + } } catch (Exception e) { JdbcUtil.rollBack(dbConn); processWriteException(e, index, row); @@ -164,6 +165,8 @@ protected void writeMultipleRecordsInternal() throws Exception { // 开启了cp,但是并没有使用2pc方式让下游数据可见 if (Semantic.EXACTLY_ONCE == semantic) { rowsOfCurrentTransaction += rows.size(); + } else { + JdbcUtil.commit(dbConn); } } catch (Exception e) { LOG.warn( @@ -216,7 +219,7 @@ public void rollback(long checkpointId) throws Exception { public void doCommit() throws SQLException { try { - if (!autoCommit) { + if (!jdbcConf.isAutoCommit()) { dbConn.commit(); } snapshotWriteCounter.add(rowsOfCurrentTransaction); diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/JdbcOutputFormatBuilder.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/JdbcOutputFormatBuilder.java index 355a41162f..7b75737cbd 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/JdbcOutputFormatBuilder.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/JdbcOutputFormatBuilder.java @@ -20,6 +20,7 @@ import com.dtstack.chunjun.connector.jdbc.conf.JdbcConf; import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect; import com.dtstack.chunjun.converter.AbstractRowConverter; +import com.dtstack.chunjun.enums.Semantic; import com.dtstack.chunjun.sink.format.BaseRichOutputFormatBuilder; import org.apache.commons.lang3.StringUtils; @@ -66,6 +67,13 @@ protected void checkFormat() { if (StringUtils.isBlank(jdbcConf.getJdbcUrl())) { sb.append("No jdbc url supplied;\n"); } + + if (Semantic.getByName(jdbcConf.getSemantic()) == Semantic.EXACTLY_ONCE + && jdbcConf.isAutoCommit()) { + sb.append( + "Exactly-once semantics requires that the jdbc driver is not in auto-commit mode;\n"); + } + if (sb.length() > 0) { throw new IllegalArgumentException(sb.toString()); } diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/JdbcDynamicTableSource.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/JdbcDynamicTableSource.java index 93893a8cbf..b33ec6832f 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/JdbcDynamicTableSource.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/JdbcDynamicTableSource.java @@ -154,7 +154,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon if (StringUtils.isNotBlank(splitPk)) { FieldConf fieldConf = FieldConf.getSameNameMetaColumn(jdbcConf.getColumn(), splitPk); if (fieldConf != null) { - jdbcConf.setSplitPk(fieldConf.getType()); + jdbcConf.setSplitPk(fieldConf.getName()); splitKeyUtil = jdbcDialect.initKeyUtil(fieldConf.getName(), fieldConf.getType()); } } diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/JdbcInputFormat.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/JdbcInputFormat.java index 63f48f3ea6..06a30b27f9 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/JdbcInputFormat.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/JdbcInputFormat.java @@ -707,6 +707,10 @@ protected void executeQuery(String startLocation) throws SQLException { } } else { statement = dbConn.createStatement(resultSetType, resultSetConcurrency); + if (jdbcConf.isDefineColumnTypeForStatement() + && StringUtils.isBlank(jdbcConf.getCustomSql())) { + defineColumnType(statement); + } statement.setFetchSize(jdbcConf.getFetchSize()); statement.setQueryTimeout(jdbcConf.getQueryTimeOut()); resultSet = statement.executeQuery(jdbcConf.getQuerySql()); @@ -714,6 +718,8 @@ protected void executeQuery(String startLocation) throws SQLException { } } + protected void defineColumnType(Statement statement) throws SQLException {} + /** init prepareStatement */ public void initPrepareStatement(String querySql) throws SQLException { ps = dbConn.prepareStatement(querySql, resultSetType, resultSetConcurrency); diff --git a/chunjun-connectors/chunjun-connector-oracle/src/main/java/com/dtstack/chunjun/connector/oracle/converter/OracleColumnConverter.java b/chunjun-connectors/chunjun-connector-oracle/src/main/java/com/dtstack/chunjun/connector/oracle/converter/OracleColumnConverter.java index c9287ae1c8..e51ac41f7d 100644 --- a/chunjun-connectors/chunjun-connector-oracle/src/main/java/com/dtstack/chunjun/connector/oracle/converter/OracleColumnConverter.java +++ b/chunjun-connectors/chunjun-connector-oracle/src/main/java/com/dtstack/chunjun/connector/oracle/converter/OracleColumnConverter.java @@ -19,10 +19,13 @@ package com.dtstack.chunjun.connector.oracle.converter; import com.dtstack.chunjun.conf.ChunJunCommonConf; +import com.dtstack.chunjun.conf.FieldConf; import com.dtstack.chunjun.connector.jdbc.converter.JdbcColumnConverter; import com.dtstack.chunjun.connector.jdbc.statement.FieldNamedPreparedStatement; +import com.dtstack.chunjun.constants.ConstantValue; import com.dtstack.chunjun.converter.IDeserializationConverter; import com.dtstack.chunjun.converter.ISerializationConverter; +import com.dtstack.chunjun.element.AbstractBaseColumn; import com.dtstack.chunjun.element.ColumnRowData; import com.dtstack.chunjun.element.column.BigDecimalColumn; import com.dtstack.chunjun.element.column.BooleanColumn; @@ -31,17 +34,25 @@ import com.dtstack.chunjun.element.column.TimestampColumn; import org.apache.flink.connector.jdbc.utils.JdbcTypeUtil; +import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimestampType; import org.apache.flink.table.types.utils.TypeConversions; import oracle.sql.TIMESTAMP; +import org.apache.commons.lang3.StringUtils; import java.io.ByteArrayInputStream; import java.io.InputStream; import java.io.StringReader; -import java.math.BigDecimal; +import java.sql.ResultSet; import java.sql.Timestamp; +import java.time.ZoneId; +import java.util.Calendar; +import java.util.List; +import java.util.Locale; +import java.util.TimeZone; /** * company www.dtstack.com @@ -50,53 +61,167 @@ */ public class OracleColumnConverter extends JdbcColumnConverter { + private static final Calendar var4 = + Calendar.getInstance(TimeZone.getTimeZone("UTC"), Locale.US); + private int currentIndex = 0; + public OracleColumnConverter(RowType rowType, ChunJunCommonConf commonConf) { super(rowType, commonConf); } + @Override + @SuppressWarnings("unchecked") + public RowData toInternal(ResultSet resultSet) throws Exception { + List fieldConfList = commonConf.getColumn(); + ColumnRowData result; + if (fieldConfList.size() == 1 + && ConstantValue.STAR_SYMBOL.equals(fieldConfList.get(0).getName())) { + result = new ColumnRowData(fieldTypes.length); + for (int index = 0; index < fieldTypes.length; index++) { + Object field = resultSet.getObject(index + 1); + AbstractBaseColumn baseColumn = + (AbstractBaseColumn) toInternalConverters.get(index).deserialize(field); + result.addField(baseColumn); + } + return result; + } + currentIndex = 0; + result = new ColumnRowData(fieldConfList.size()); + for (FieldConf fieldConf : fieldConfList) { + AbstractBaseColumn baseColumn = null; + if (StringUtils.isBlank(fieldConf.getValue())) { + baseColumn = + (AbstractBaseColumn) + toInternalConverters.get(currentIndex).deserialize(resultSet); + currentIndex++; + } + result.addField(assembleFieldProps(fieldConf, baseColumn)); + } + return result; + } + @Override protected IDeserializationConverter createInternalConverter(LogicalType type) { switch (type.getTypeRoot()) { case BOOLEAN: - return val -> new BooleanColumn(Boolean.parseBoolean(val.toString())); + return (IDeserializationConverter) + field -> { + Object object = field.getObject(currentIndex + 1); + if (object == null) { + return null; + } + return new BooleanColumn(Boolean.parseBoolean(object.toString())); + }; case TINYINT: - return val -> new BigDecimalColumn(((Integer) val).byteValue()); + return (IDeserializationConverter) + field -> { + Object object = field.getObject(currentIndex + 1); + if (object == null) { + return null; + } + return new BigDecimalColumn(((Integer) object).byteValue()); + }; case SMALLINT: case INTEGER: - return val -> new BigDecimalColumn((Integer) val); case FLOAT: - return val -> new BigDecimalColumn((Float) val); case DOUBLE: - return val -> new BigDecimalColumn((Double) val); case BIGINT: - return val -> new BigDecimalColumn((Long) val); case DECIMAL: - return val -> new BigDecimalColumn((BigDecimal) val); + return (IDeserializationConverter) + field -> { + String object = field.getString(currentIndex + 1); + if (object == null) { + return null; + } + return new BigDecimalColumn(object); + }; + case CHAR: case VARCHAR: if (type instanceof ClobType) { - return val -> { - oracle.sql.CLOB clob = (oracle.sql.CLOB) val; - return new StringColumn(ConvertUtil.convertClob(clob)); - }; + return (IDeserializationConverter) + field -> { + Object object = field.getObject(currentIndex + 1); + if (object == null) { + return null; + } + oracle.sql.CLOB clob = (oracle.sql.CLOB) object; + return new StringColumn(ConvertUtil.convertClob(clob)); + }; } - return val -> new StringColumn((String) val); + return (IDeserializationConverter) + field -> { + String object = field.getString(currentIndex + 1); + if (object == null) { + return null; + } + return new StringColumn(object); + }; case DATE: - return val -> new TimestampColumn((Timestamp) val, 0); - case TIMESTAMP_WITH_TIME_ZONE: + return (IDeserializationConverter) + field -> { + Timestamp object = field.getTimestamp(currentIndex + 1); + if (object == null) { + return null; + } + return new TimestampColumn(object, 0); + }; case TIMESTAMP_WITHOUT_TIME_ZONE: - return val -> new TimestampColumn(((TIMESTAMP) val).timestampValue()); + return (IDeserializationConverter) + field -> { + int precision = ((TimestampType) (type)).getPrecision(); + Timestamp object = field.getTimestamp(currentIndex + 1); + if (object == null) { + return null; + } + return new TimestampColumn(object, precision); + }; + case TIMESTAMP_WITH_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return (IDeserializationConverter) + field -> { + Object val = field.getObject(currentIndex + 1); + if (val == null) { + return null; + } + if (val instanceof TIMESTAMP) { + return new TimestampColumn(((TIMESTAMP) val).timestampValue()); + } else if (val instanceof oracle.sql.TIMESTAMPLTZ) { + return new TimestampColumn( + ((oracle.sql.TIMESTAMPLTZ) val) + .timestampValue(null, Calendar.getInstance())); + } else if (val instanceof oracle.sql.TIMESTAMPTZ) { + return new TimestampColumn( + (toTimestamp2( + ((oracle.sql.TIMESTAMPTZ) val).getBytes(), null))); + } else if (val instanceof Timestamp) { + return new TimestampColumn((Timestamp) val); + } + throw new RuntimeException("not support type" + val.getClass()); + }; case BINARY: case VARBINARY: - return val -> { - if (type instanceof BlobType) { - oracle.sql.BLOB blob = (oracle.sql.BLOB) val; - byte[] bytes = ConvertUtil.toByteArray(blob); - return new BytesColumn(bytes); - } else { - return new BytesColumn((byte[]) val); - } - }; + if (type instanceof BlobType) { + return (IDeserializationConverter) + field -> { + Object object = field.getObject(currentIndex + 1); + if (object == null) { + return null; + } + oracle.sql.BLOB blob = (oracle.sql.BLOB) object; + byte[] bytes = ConvertUtil.toByteArray(blob); + return new BytesColumn(bytes); + }; + } else { + return (IDeserializationConverter) + field -> { + byte[] object = field.getBytes(currentIndex + 1); + if (object == null) { + return null; + } + return new BytesColumn(object); + }; + } default: throw new UnsupportedOperationException("Unsupported type:" + type); } @@ -169,6 +294,7 @@ protected ISerializationConverter createExternalCon case DATE: case TIMESTAMP_WITH_TIME_ZONE: case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: return (val, index, statement) -> statement.setTimestamp( index, ((ColumnRowData) val).getField(index).asTimestamp()); @@ -188,4 +314,32 @@ protected ISerializationConverter createExternalCon throw new UnsupportedOperationException("Unsupported type:" + type); } } + + public static Timestamp toTimestamp2(byte[] var1, String zone) { + int[] var2 = new int[13]; + + int var3; + for (var3 = 0; var3 < 13; ++var3) { + var2[var3] = var1[var3] & 255; + } + + var3 = TIMESTAMP.getJavaYear(var2[0], var2[1]); + + var4.clear(); + var4.set(1, var3); + var4.set(2, var2[2] - 1); + var4.set(5, var2[3]); + var4.set(11, var2[4] - 1); + var4.set(12, var2[5] - 1); + var4.set(13, var2[6] - 1); + var4.set(14, 0); + long var5 = var4.getTime().getTime(); + Timestamp var7 = new Timestamp(var5); + int var8 = TIMESTAMP.getNanos(var1, 7); + var7.setNanos(var8); + if (StringUtils.isNotBlank(zone)) { + var7 = Timestamp.valueOf(var7.toInstant().atZone(ZoneId.of(zone)).toLocalDateTime()); + } + return var7; + } } diff --git a/chunjun-connectors/chunjun-connector-oracle/src/main/java/com/dtstack/chunjun/connector/oracle/source/OracleInputFormat.java b/chunjun-connectors/chunjun-connector-oracle/src/main/java/com/dtstack/chunjun/connector/oracle/source/OracleInputFormat.java index df61ce1067..a33990b5a8 100644 --- a/chunjun-connectors/chunjun-connector-oracle/src/main/java/com/dtstack/chunjun/connector/oracle/source/OracleInputFormat.java +++ b/chunjun-connectors/chunjun-connector-oracle/src/main/java/com/dtstack/chunjun/connector/oracle/source/OracleInputFormat.java @@ -19,10 +19,83 @@ package com.dtstack.chunjun.connector.oracle.source; import com.dtstack.chunjun.connector.jdbc.source.JdbcInputFormat; +import com.dtstack.chunjun.connector.oracle.converter.BlobType; +import com.dtstack.chunjun.connector.oracle.converter.ClobType; +import com.dtstack.chunjun.util.TableUtil; + +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import oracle.jdbc.OracleStatement; +import oracle.jdbc.OracleTypes; + +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Types; +import java.util.ArrayList; /** * company www.dtstack.com * * @author jier */ -public class OracleInputFormat extends JdbcInputFormat {} +public class OracleInputFormat extends JdbcInputFormat { + + @Override + protected void defineColumnType(Statement statement) throws SQLException { + if (statement instanceof oracle.jdbc.OracleStatement) { + OracleStatement oracleStatement = (OracleStatement) statement; + ArrayList names = new ArrayList<>(); + ArrayList types = new ArrayList<>(); + jdbcConf.getColumn() + .forEach( + i -> { + names.add(i.getName()); + types.add(i.getType()); + }); + RowType rowType = + TableUtil.createRowType(names, types, jdbcDialect.getRawTypeConverter()); + OracleStatement oraclePreparedStatement = (OracleStatement) statement; + oraclePreparedStatement.clearDefines(); + for (int i = 0; i < rowType.getFieldCount(); i++) { + defineType(i, rowType.getTypeAt(i), oracleStatement); + } + } + } + + protected void defineType(int position, LogicalType logicalType, OracleStatement statement) + throws SQLException { + switch (logicalType.getTypeRoot()) { + case SMALLINT: + case INTEGER: + case FLOAT: + case DOUBLE: + case BIGINT: + case DECIMAL: + statement.defineColumnType(position + 1, Types.VARCHAR); + break; + case CHAR: + case VARCHAR: + if (!(logicalType instanceof ClobType)) { + statement.defineColumnType(position + 1, Types.VARCHAR); + } + break; + case DATE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + statement.defineColumnType(position + 1, Types.TIMESTAMP); + break; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + statement.defineColumnType(position + 1, OracleTypes.TIMESTAMPLTZ); + break; + case TIMESTAMP_WITH_TIME_ZONE: + statement.defineColumnType(position + 1, OracleTypes.TIMESTAMPTZ); + break; + case BINARY: + case VARBINARY: + if (!(logicalType instanceof BlobType)) { + statement.defineColumnType(position + 1, Types.VARBINARY); + } + break; + } + } +} diff --git a/chunjun-connectors/chunjun-connector-postgresql/src/main/java/com/dtstack/chunjun/connector/postgresql/dialect/PostgresqlDialect.java b/chunjun-connectors/chunjun-connector-postgresql/src/main/java/com/dtstack/chunjun/connector/postgresql/dialect/PostgresqlDialect.java index 783e5fe56f..87c3f04125 100644 --- a/chunjun-connectors/chunjun-connector-postgresql/src/main/java/com/dtstack/chunjun/connector/postgresql/dialect/PostgresqlDialect.java +++ b/chunjun-connectors/chunjun-connector-postgresql/src/main/java/com/dtstack/chunjun/connector/postgresql/dialect/PostgresqlDialect.java @@ -47,9 +47,9 @@ public class PostgresqlDialect implements JdbcDialect { private static final String DIALECT_NAME = "PostgreSQL"; private static final String DRIVER = "org.postgresql.Driver"; - private static final String URL_START = "jdbc:postgresql:"; + public static final String URL_START = "jdbc:postgresql:"; - private static final String COPY_SQL_TEMPL = + protected static final String COPY_SQL_TEMPL = "copy %s(%s) from stdin DELIMITER '%s' NULL as '%s'"; @Override @@ -140,15 +140,22 @@ public String getSelectFromStatement( } public String getCopyStatement( - String tableName, String[] fields, String fieldDelimiter, String nullVal) { + String schemaName, + String tableName, + String[] fields, + String fieldDelimiter, + String nullVal) { String fieldsExpression = Arrays.stream(fields).map(this::quoteIdentifier).collect(Collectors.joining(", ")); + String tableLocation; + if (schemaName != null && !"".equals(schemaName.trim())) { + tableLocation = quoteIdentifier(schemaName) + "." + quoteIdentifier(tableName); + } else { + tableLocation = quoteIdentifier(tableName); + } + return String.format( - COPY_SQL_TEMPL, - quoteIdentifier(tableName), - fieldsExpression, - fieldDelimiter, - nullVal); + COPY_SQL_TEMPL, tableLocation, fieldsExpression, fieldDelimiter, nullVal); } } diff --git a/chunjun-connectors/chunjun-connector-postgresql/src/main/java/com/dtstack/chunjun/connector/postgresql/sink/PostgresOutputFormat.java b/chunjun-connectors/chunjun-connector-postgresql/src/main/java/com/dtstack/chunjun/connector/postgresql/sink/PostgresOutputFormat.java index f39d02b38e..54161cdb98 100644 --- a/chunjun-connectors/chunjun-connector-postgresql/src/main/java/com/dtstack/chunjun/connector/postgresql/sink/PostgresOutputFormat.java +++ b/chunjun-connectors/chunjun-connector-postgresql/src/main/java/com/dtstack/chunjun/connector/postgresql/sink/PostgresOutputFormat.java @@ -76,6 +76,7 @@ protected void openInternal(int taskNumber, int numTasks) { PostgresqlDialect pgDialect = (PostgresqlDialect) jdbcDialect; copySql = pgDialect.getCopyStatement( + jdbcConf.getSchema(), jdbcConf.getTable(), columnNameList.toArray(new String[0]), StringUtils.isNullOrWhitespaceOnly(jdbcConf.getFieldDelim().trim()) diff --git a/chunjun-connectors/chunjun-connector-vertica11/src/main/java/com/dtstack/chunjun/connector/vertica11/sink/Vertica11OutputFormat.java b/chunjun-connectors/chunjun-connector-vertica11/src/main/java/com/dtstack/chunjun/connector/vertica11/sink/Vertica11OutputFormat.java index 2d0b90a2ba..004164aafe 100644 --- a/chunjun-connectors/chunjun-connector-vertica11/src/main/java/com/dtstack/chunjun/connector/vertica11/sink/Vertica11OutputFormat.java +++ b/chunjun-connectors/chunjun-connector-vertica11/src/main/java/com/dtstack/chunjun/connector/vertica11/sink/Vertica11OutputFormat.java @@ -23,7 +23,6 @@ import com.dtstack.chunjun.connector.jdbc.util.JdbcUtil; import com.dtstack.chunjun.connector.vertica11.dialect.Vertica11Dialect; import com.dtstack.chunjun.enums.EWriteMode; -import com.dtstack.chunjun.enums.Semantic; import com.dtstack.chunjun.util.JsonUtil; import org.apache.commons.collections.CollectionUtils; @@ -39,9 +38,7 @@ protected void openInternal(int taskNumber, int numTasks) { dbConn = getConnection(); // By default, transaction auto-commit is turned off, and transactions are manually // controlled - if (Semantic.EXACTLY_ONCE == semantic) { - dbConn.setAutoCommit(false); - } + dbConn.setAutoCommit(jdbcConf.isAutoCommit()); if (!EWriteMode.INSERT.name().equalsIgnoreCase(jdbcConf.getMode())) { List updateKey = jdbcConf.getUniqueKey(); if (CollectionUtils.isEmpty(updateKey)) {