Skip to content

Commit

Permalink
[Feature][Jdbc] Support hive compatibleMode add inceptor dialect (apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
Xuzhengz authored Jul 26, 2024
1 parent d003bd8 commit 31e59cd
Show file tree
Hide file tree
Showing 5 changed files with 223 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@ public class DatabaseIdentifier {
public static final String TIDB = "TiDB";
public static final String XUGU = "XUGU";
public static final String IRIS = "IRIS";
public static final String INCEPTOR = "Inceptor";
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.inceptor.InceptorDialect;

import com.google.auto.service.AutoService;

Expand All @@ -33,6 +34,15 @@ public boolean acceptsURL(String url) {

@Override
public JdbcDialect create() {
throw new UnsupportedOperationException(
"Can't create JdbcDialect without compatible mode for Hive");
}

@Override
public JdbcDialect create(String compatibleMode, String fieldId) {
if ("inceptor".equals(compatibleMode)) {
return new InceptorDialect();
}
return new HiveDialect();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.inceptor;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.hive.HiveDialect;

public class InceptorDialect extends HiveDialect {

@Override
public String dialectName() {
return DatabaseIdentifier.INCEPTOR;
}

@Override
public JdbcRowConverter getRowConverter() {
return new InceptorJdbcRowConverter();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.inceptor;

import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.hive.HiveJdbcRowConverter;

import org.apache.commons.lang3.StringUtils;

import java.math.BigDecimal;
import java.sql.PreparedStatement;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;

public class InceptorJdbcRowConverter extends HiveJdbcRowConverter {

@Override
public String converterName() {
return DatabaseIdentifier.INCEPTOR;
}

@Override
public PreparedStatement toExternal(
TableSchema tableSchema, SeaTunnelRow row, PreparedStatement statement) {
SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType();
for (int fieldIndex = 0; fieldIndex < rowType.getTotalFields(); fieldIndex++) {
try {
SeaTunnelDataType<?> seaTunnelDataType = rowType.getFieldType(fieldIndex);
int statementIndex = fieldIndex + 1;
Object fieldValue = row.getField(fieldIndex);
if (fieldValue == null) {
statement.setObject(statementIndex, StringUtils.EMPTY);
continue;
}
switch (seaTunnelDataType.getSqlType()) {
case STRING:
statement.setString(statementIndex, (String) row.getField(fieldIndex));
break;
case BOOLEAN:
statement.setBoolean(statementIndex, (Boolean) row.getField(fieldIndex));
break;
case TINYINT:
statement.setByte(statementIndex, (Byte) row.getField(fieldIndex));
break;
case SMALLINT:
statement.setShort(statementIndex, (Short) row.getField(fieldIndex));
break;
case INT:
statement.setInt(statementIndex, (Integer) row.getField(fieldIndex));
break;
case BIGINT:
statement.setLong(statementIndex, (Long) row.getField(fieldIndex));
break;
case FLOAT:
statement.setFloat(statementIndex, (Float) row.getField(fieldIndex));
break;
case DOUBLE:
statement.setDouble(statementIndex, (Double) row.getField(fieldIndex));
break;
case DECIMAL:
statement.setBigDecimal(
statementIndex, (BigDecimal) row.getField(fieldIndex));
break;
case DATE:
LocalDate localDate = (LocalDate) row.getField(fieldIndex);
statement.setDate(statementIndex, java.sql.Date.valueOf(localDate));
break;
case TIME:
writeTime(statement, statementIndex, (LocalTime) row.getField(fieldIndex));
break;
case TIMESTAMP:
LocalDateTime localDateTime = (LocalDateTime) row.getField(fieldIndex);
statement.setTimestamp(
statementIndex, java.sql.Timestamp.valueOf(localDateTime));
break;
case BYTES:
statement.setBytes(statementIndex, (byte[]) row.getField(fieldIndex));
break;
case NULL:
statement.setNull(statementIndex, java.sql.Types.NULL);
break;
case ARRAY:
SeaTunnelDataType elementType =
((ArrayType) seaTunnelDataType).getElementType();
Object[] array = (Object[]) row.getField(fieldIndex);
if (array == null) {
statement.setNull(statementIndex, java.sql.Types.ARRAY);
break;
}
if (SqlType.TINYINT.equals(elementType.getSqlType())) {
Short[] shortArray = new Short[array.length];
for (int i = 0; i < array.length; i++) {
shortArray[i] = Short.valueOf(array[i].toString());
}
statement.setObject(statementIndex, shortArray);
} else {
statement.setObject(statementIndex, array);
}
break;
case MAP:
case ROW:
default:
throw new JdbcConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
"Unexpected value: " + seaTunnelDataType);
}
} catch (Exception e) {
throw new JdbcConnectorException(
JdbcConnectorErrorCode.DATA_TYPE_CAST_FAILED,
"error field:" + rowType.getFieldNames()[fieldIndex],
e);
}
}
return statement;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.hive;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.inceptor.InceptorDialect;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class HiveDialectFactoryTest {

@Test
public void testWithCompatibleMode() {
HiveDialectFactory hiveDialectFactory = new HiveDialectFactory();
JdbcDialect inceptorDialect = hiveDialectFactory.create("inceptor", "");
Assertions.assertTrue(inceptorDialect instanceof InceptorDialect);
JdbcDialect hiveDialect = hiveDialectFactory.create("", "");
Assertions.assertTrue(hiveDialect instanceof HiveDialect);
}
}

0 comments on commit 31e59cd

Please sign in to comment.