diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml b/seatunnel-connectors-v2/connector-jdbc/pom.xml index 7e49bcce0cea..9721870da9df 100644 --- a/seatunnel-connectors-v2/connector-jdbc/pom.xml +++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml @@ -108,6 +108,10 @@ com.oracle.database.jdbc ojdbc8 + + com.ibm.db2 + jcc + - + \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2Dialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2Dialect.java new file mode 100644 index 000000000000..adb09419de2b --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2Dialect.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.db2; + +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; + +public class DB2Dialect implements JdbcDialect { + + @Override + public String dialectName() { + return "DB2"; + } + + @Override + public JdbcRowConverter getRowConverter() { + return new DB2JdbcRowConverter(); + } + + @Override + public JdbcDialectTypeMapper getJdbcDialectTypeMapper() { + return new DB2TypeMapper(); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2DialectFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2DialectFactory.java new file mode 100644 index 000000000000..acc467f64018 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2DialectFactory.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.db2; + + +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory; + +import com.google.auto.service.AutoService; + +/** + * Factory for {@link DB2Dialect}. + */ + +@AutoService(JdbcDialectFactory.class) +public class DB2DialectFactory implements JdbcDialectFactory { + + @Override + public boolean acceptsURL(String url) { + return url.startsWith("jdbc:mysql:"); + } + + @Override + public JdbcDialect create() { + return new DB2Dialect(); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2JdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2JdbcRowConverter.java new file mode 100644 index 000000000000..80076c193d7d --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2JdbcRowConverter.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.db2; + +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter; + +public class DB2JdbcRowConverter extends AbstractJdbcRowConverter { + @Override + public String converterName() { + return "DB2"; + } + + @Override + public SeaTunnelRow toInternal(ResultSet rs, ResultSetMetaData metaData, SeaTunnelRowType typeInfo) throws SQLException { + return super.toInternal(rs, metaData, typeInfo); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2TypeMapper.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2TypeMapper.java new file mode 100644 index 000000000000..a8c84eb2cf3a --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2TypeMapper.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.db2; + + +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.ResultSetMetaData; +import java.sql.SQLException; + +public class DB2TypeMapper implements JdbcDialectTypeMapper { + + private static final Logger LOG = LoggerFactory.getLogger(JdbcDialect.class); + + // reference https://www.ibm.com/docs/en/ssw_ibm_i_75/pdf/rbafzpdf.pdf + + // ============================data types===================== + private static final String DB2_BOOLEAN = "BOOLEAN"; + private static final String DB2_ROWID = "ROWID"; + private static final String DB2_SMALLINT = "SMALLINT"; + private static final String DB2_INTEGER = "INTEGER"; + private static final String DB2_INT = "INT"; + private static final String DB2_BIGINT = "BIGINT"; + // exact + private static final String DB2_DECIMAL = "DECIMAL"; + private static final String DB2_DEC = "DEC"; + private static final String DB2_NUMERIC = "NUMERIC"; + private static final String DB2_NUM = "NUM"; + + + // float + private static final String DB2_REAL = "REAL"; + private static final String DB2_FLOAT = "FLOAT"; + private static final String DB2_DOUBLE = "DOUBLE"; + private static final String DB2_DOUBLE_PRECISION = "DOUBLE PRECISION"; + private static final String DB2_DECFLOAT = "DECFLOAT"; + private static final String DB2_DECDOUBLE = "DECDOUBLE"; + + // string + private static final String DB2_CHAR = "CHAR"; + private static final String DB2_VARCHAR = "VARCHAR"; + private static final String DB2_CLOB = "CLOB"; + private static final String DB2_LONGVARCHAR = "LONG VARCHAR"; + // graphic + private static final String DB2_GRAPHIC = "GRAPHIC"; + private static final String DB2_VARGRAPHIC = "VARGRAPHIC"; + private static final String DB2_LONG_VARGRAPHIC = "LONG VARGRAPHIC"; + private static final String DB2_DBCLOB = "DBCLOB"; + + // ---------------------------binary--------------------------- + private static final String DB2_BINARY = "BINARY"; + private static final String DB2_VARBINARY = "VARBINARY"; + + // ------------------------------time------------------------- + private static final String DB2_DATE = "DATE"; + private static final String DB2_TIME = "TIME"; + private static final String DB2_TIMESTAMP = "TIMESTAMP"; + + + // ------------------------------blob------------------------- + private static final String DB2_BLOB = "BLOB"; + private static final String DB2_NCHAR_MAPPING = "NCHAR_MAPPING"; + + // other + private static final String DB2_XML = "XML"; + private static final String DB2_LOB = "XML"; + private static final String DB2_DATALINK = "DATALINK"; + + + @SuppressWarnings("checkstyle:MagicNumber") + @Override + public SeaTunnelDataType mapping(ResultSetMetaData metadata, int colIndex) throws SQLException { + String columnType = metadata.getColumnTypeName(colIndex).toUpperCase(); + int precision = metadata.getPrecision(colIndex); + int scale = metadata.getScale(colIndex); + switch (columnType) { + case DB2_BOOLEAN: + + default: + final String jdbcColumnName = metadata.getColumnName(colIndex); + throw new UnsupportedOperationException( + String.format("Doesn't support DM2 type '%s' on column '%s' yet.", columnType, jdbcColumnName)); + } + } +}