Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature][doris] Doris factory type #5061

Merged
merged 4 commits into from
Aug 6, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.doris.config;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;

import com.google.auto.service.AutoService;

@AutoService(Factory.class)
public class DorisSinkFactory implements TableSinkFactory {

public static final String IDENTIFIER = "Doris";

@Override
public String factoryIdentifier() {
return IDENTIFIER;
}

@Override
public OptionRule optionRule() {
return OptionRule.builder()
.required(
DorisConfig.FENODES,
DorisConfig.USERNAME,
DorisConfig.PASSWORD,
DorisConfig.SINK_LABEL_PREFIX,
DorisConfig.DORIS_SINK_CONFIG_PREFIX)
.optional(DorisConfig.SINK_ENABLE_2PC, DorisConfig.SINK_ENABLE_DELETE)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.doris.datatype;

import org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SqlType;

import org.apache.commons.collections4.MapUtils;

import com.google.auto.service.AutoService;
import com.google.common.collect.ImmutableMap;

import java.util.Collections;
import java.util.Locale;
import java.util.Map;

import static com.google.common.base.Preconditions.checkNotNull;

@AutoService(DataTypeConvertor.class)
public class DorisDataTypeConvertor implements DataTypeConvertor<String> {

public static final String NULL = "NULL";
public static final String BOOLEAN = "BOOLEAN";
public static final String TINYINT = "TINYINT";
public static final String SMALLINT = "SMALLINT";
public static final String INT = "INT";
public static final String BIGINT = "BIGINT";
public static final String FLOAT = "FLOAT";
public static final String DOUBLE = "DOUBLE";
public static final String DECIMAL = "DECIMAL";
public static final String DATE = "DATE";
public static final String DATETIME = "DATETIME";
public static final String CHAR = "CHAR";
public static final String VARCHAR = "VARCHAR";
public static final String BINARY = "BINARY";
public static final String VARBINARY = "VARBINARY";
public static final String ARRAY = "ARRAY";
public static final String MAP = "MAP";
public static final String STRUCT = "STRUCT";
public static final String UNION = "UNION";
public static final String INTERVAL = "INTERVAL";
public static final String TIMESTAMP = "TIMESTAMP";
public static final String YEAR = "YEAR";
public static final String GEOMETRY = "GEOMETRY";
public static final String IP = "IP";

public static final String PRECISION = "precision";
public static final String SCALE = "scale";

public static final Integer DEFAULT_PRECISION = 10;

public static final Integer DEFAULT_SCALE = 0;

@Override
public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
checkNotNull(connectorDataType, "connectorDataType can not be null");
Map<String, Object> dataTypeProperties;
switch (connectorDataType.toUpperCase(Locale.ROOT)) {
case DECIMAL:
// parse precision and scale
int left = connectorDataType.indexOf("(");
int right = connectorDataType.indexOf(")");
int precision = DEFAULT_PRECISION;
int scale = DEFAULT_SCALE;
if (left != -1 && right != -1) {
String[] precisionAndScale =
connectorDataType.substring(left + 1, right).split(",");
if (precisionAndScale.length == 2) {
precision = Integer.parseInt(precisionAndScale[0]);
scale = Integer.parseInt(precisionAndScale[1]);
} else if (precisionAndScale.length == 1) {
precision = Integer.parseInt(precisionAndScale[0]);
}
}
dataTypeProperties = ImmutableMap.of(PRECISION, precision, SCALE, scale);
break;
default:
dataTypeProperties = Collections.emptyMap();
break;
}
return toSeaTunnelType(connectorDataType, dataTypeProperties);
}

@Override
public SeaTunnelDataType<?> toSeaTunnelType(
String connectorDataType, Map<String, Object> dataTypeProperties)
throws DataTypeConvertException {
checkNotNull(connectorDataType, "mysqlType can not be null");
int precision;
int scale;
switch (connectorDataType.toUpperCase(Locale.ROOT)) {
case NULL:
return BasicType.VOID_TYPE;
case BOOLEAN:
return BasicType.BOOLEAN_TYPE;
// case BIT:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clean this comment?

// precision = (Integer)
// dataTypeProperties.get(MysqlDataTypeConvertor.PRECISION);
// if (precision == 1) {
// return BasicType.BOOLEAN_TYPE;
// } else {
// return PrimitiveByteArrayType.INSTANCE;
// }
case TINYINT:
return BasicType.BYTE_TYPE;
case SMALLINT:
return BasicType.SHORT_TYPE;
case INT:
case YEAR:
return BasicType.INT_TYPE;
case BIGINT:
return BasicType.LONG_TYPE;
case FLOAT:
return BasicType.FLOAT_TYPE;
case DOUBLE:
return BasicType.DOUBLE_TYPE;
// case TIME:
// return LocalTimeType.LOCAL_TIME_TYPE;
case DATE:
return LocalTimeType.LOCAL_DATE_TYPE;
case TIMESTAMP:
case DATETIME:
return LocalTimeType.LOCAL_DATE_TIME_TYPE;
case CHAR:
case VARCHAR:
return BasicType.STRING_TYPE;
case BINARY:
case VARBINARY:
case GEOMETRY:
return PrimitiveByteArrayType.INSTANCE;
case DECIMAL:
precision = MapUtils.getInteger(dataTypeProperties, PRECISION, DEFAULT_PRECISION);
scale = MapUtils.getInteger(dataTypeProperties, SCALE, DEFAULT_SCALE);
return new DecimalType(precision, scale);
default:
throw new UnsupportedOperationException(
String.format("Doesn't support DORIS type '%s'' yet.", connectorDataType));
}
}

@Override
public String toConnectorType(
SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> dataTypeProperties)
throws DataTypeConvertException {
checkNotNull(seaTunnelDataType, "seaTunnelDataType cannot be null");
SqlType sqlType = seaTunnelDataType.getSqlType();
// todo: verify
switch (sqlType) {
case ARRAY:
return ARRAY;
case MAP:
case ROW:
case STRING:
case NULL:
return VARCHAR;
case BOOLEAN:
return BOOLEAN;

case TINYINT:
return TINYINT;
case SMALLINT:
return SMALLINT;
case INT:
return INT;
case BIGINT:
return BIGINT;
case FLOAT:
return FLOAT;
case DOUBLE:
return DOUBLE;
case DECIMAL:
return DECIMAL;
case BYTES:
return BINARY;
case DATE:
return DATE;
case TIME:
case TIMESTAMP:
return TIMESTAMP;
default:
throw new UnsupportedOperationException(
String.format("Doesn't support Doris type '%s'' yet.", sqlType));
}
}

@Override
public String getIdentity() {
return "Doris";
}
}