diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2TypeMapper.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2TypeMapper.java index a8c84eb2cf3a..959425612e59 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2TypeMapper.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2TypeMapper.java @@ -18,6 +18,10 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.db2; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; @@ -32,9 +36,9 @@ public class DB2TypeMapper implements JdbcDialectTypeMapper { private static final Logger LOG = LoggerFactory.getLogger(JdbcDialect.class); // reference https://www.ibm.com/docs/en/ssw_ibm_i_75/pdf/rbafzpdf.pdf - // ============================data types===================== private static final String DB2_BOOLEAN = "BOOLEAN"; + private static final String DB2_ROWID = "ROWID"; private static final String DB2_SMALLINT = "SMALLINT"; private static final String DB2_INTEGER = "INTEGER"; @@ -59,7 +63,7 @@ public class DB2TypeMapper implements JdbcDialectTypeMapper { private static final String DB2_CHAR = "CHAR"; private static final String DB2_VARCHAR = "VARCHAR"; private static final String DB2_CLOB = "CLOB"; - private static final String DB2_LONGVARCHAR = "LONG VARCHAR"; + private static final String DB2_LONG_VARCHAR = "LONG VARCHAR"; // graphic private static final String DB2_GRAPHIC = "GRAPHIC"; private static final String DB2_VARGRAPHIC = "VARGRAPHIC"; @@ -82,7 +86,7 @@ public class DB2TypeMapper implements JdbcDialectTypeMapper { // other private static final String DB2_XML = "XML"; - private static final String DB2_LOB = "XML"; + private static final String DB2_LOB = "LOB"; private static final String DB2_DATALINK = "DATALINK"; @@ -91,10 +95,57 @@ public class DB2TypeMapper implements JdbcDialectTypeMapper { public SeaTunnelDataType mapping(ResultSetMetaData metadata, int colIndex) throws SQLException { String columnType = metadata.getColumnTypeName(colIndex).toUpperCase(); int precision = metadata.getPrecision(colIndex); - int scale = metadata.getScale(colIndex); switch (columnType) { case DB2_BOOLEAN: - + return BasicType.BOOLEAN_TYPE; + case DB2_SMALLINT: + return BasicType.SHORT_TYPE; + case DB2_INT: + case DB2_INTEGER: + return BasicType.INT_TYPE; + case DB2_BIGINT: + return BasicType.LONG_TYPE; + case DB2_DECIMAL: + case DB2_DEC: + case DB2_NUMERIC: + case DB2_NUM: + if (precision > 0) { + return new DecimalType(precision, metadata.getScale(colIndex)); + } + return new DecimalType(38, 18); + case DB2_REAL: + return BasicType.FLOAT_TYPE; + case DB2_FLOAT: + case DB2_DOUBLE: + case DB2_DOUBLE_PRECISION: + case DB2_DECFLOAT: + case DB2_DECDOUBLE: + return BasicType.DOUBLE_TYPE; + case DB2_CHAR: + case DB2_VARCHAR: + case DB2_CLOB: + case DB2_LONG_VARCHAR: + case DB2_GRAPHIC: + case DB2_VARGRAPHIC: + case DB2_LONG_VARGRAPHIC: + case DB2_DBCLOB: + return BasicType.STRING_TYPE; + case DB2_BINARY: + case DB2_VARBINARY: + case DB2_BLOB: + case DB2_NCHAR_MAPPING: + return PrimitiveByteArrayType.INSTANCE; + case DB2_DATE: + return LocalTimeType.LOCAL_DATE_TYPE; + case DB2_TIME: + return LocalTimeType.LOCAL_TIME_TYPE; + case DB2_TIMESTAMP: + return LocalTimeType.LOCAL_DATE_TIME_TYPE; + case DB2_ROWID: + // should support + case DB2_XML: + case DB2_LOB: + case DB2_DATALINK: default: final String jdbcColumnName = metadata.getColumnName(colIndex); throw new UnsupportedOperationException(