diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java index 01d2a2874ce..393049162d1 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java @@ -34,11 +34,11 @@ public abstract class Column { /** * Data type of the column. */ - protected final DataType dataType; + protected final DataType dataType; protected final String comment; - private Column(String name, DataType dataType, String comment) { + private Column(String name, DataType dataType, String comment) { this.name = name; this.dataType = dataType; this.comment = comment; @@ -47,7 +47,7 @@ private Column(String name, DataType dataType, String comment) { /** * Creates a regular table column that represents physical data. */ - public static PhysicalColumn physical(String name, DataType dataType) { + public static PhysicalColumn physical(String name, DataType dataType) { return new PhysicalColumn(name, dataType); } @@ -58,7 +58,7 @@ public static PhysicalColumn physical(String name, DataType dataType) { *

Allows to specify whether the column is virtual or not. */ public static MetadataColumn metadata( - String name, DataType dataType, String metadataKey) { + String name, DataType dataType, String metadataKey) { return new MetadataColumn(name, dataType, metadataKey); } @@ -76,7 +76,7 @@ public static MetadataColumn metadata( /** * Returns the data type of this column. */ - public DataType getDataType() { + public DataType getDataType() { return this.dataType; } @@ -97,7 +97,7 @@ public Optional getComment() { /** * Returns a copy of the column with a replaced {@link DataType}. */ - public abstract Column copy(DataType newType); + public abstract Column copy(DataType newType); @Override public boolean equals(Object o) { @@ -127,11 +127,11 @@ public int hashCode() { */ public static final class PhysicalColumn extends Column { - private PhysicalColumn(String name, DataType dataType) { + private PhysicalColumn(String name, DataType dataType) { this(name, dataType, null); } - private PhysicalColumn(String name, DataType dataType, String comment) { + private PhysicalColumn(String name, DataType dataType, String comment) { super(name, dataType, comment); } @@ -149,7 +149,7 @@ public boolean isPhysical() { } @Override - public Column copy(DataType newDataType) { + public Column copy(DataType newDataType) { return new PhysicalColumn(name, newDataType, comment); } } @@ -162,13 +162,13 @@ public static final class MetadataColumn extends Column { private final String metadataKey; private MetadataColumn( - String name, DataType dataType, String metadataKey) { + String name, DataType dataType, String metadataKey) { this(name, dataType, metadataKey, null); } private MetadataColumn( String name, - DataType dataType, + DataType dataType, String metadataKey, String comment) { super(name, dataType, comment); @@ -193,7 +193,7 @@ public boolean isPhysical() { } @Override - public Column copy(DataType newDataType) { + public Column copy(DataType newDataType) { return new MetadataColumn(name, newDataType, metadataKey, comment); } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SupportReadingMetadata.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SupportReadingMetadata.java index 3fc90e17324..661885807f7 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SupportReadingMetadata.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SupportReadingMetadata.java @@ -28,7 +28,7 @@ */ public interface SupportReadingMetadata { - Map listReadableMetadata(CatalogTable catalogTable); + Map> listReadableMetadata(CatalogTable catalogTable); - void applyReadableMetadata(CatalogTable catalogTable, List metadataKeys, DataType dataType); + void applyReadableMetadata(CatalogTable catalogTable, List metadataKeys, DataType dataType); } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java new file mode 100644 index 00000000000..1e24cd25159 --- /dev/null +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java @@ -0,0 +1,15 @@ +package org.apache.seatunnel.api.table.type; + +public class ArrayType implements DataType { + + private final BasicType elementType; + + public ArrayType(BasicType elementType) { + this.elementType = elementType; + } + + public BasicType getElementType() { + return elementType; + } + +} diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java index 13489f228fc..c2834f7e048 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java @@ -17,6 +17,9 @@ package org.apache.seatunnel.api.table.type; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.time.Instant; import java.util.Date; public class BasicType implements DataType { @@ -29,19 +32,50 @@ public class BasicType implements DataType { public static final BasicType LONG = new BasicType<>(Long.class); public static final BasicType FLOAT = new BasicType<>(Float.class); public static final BasicType BYTE = new BasicType<>(Byte.class); + public static final BasicType SHORT = new BasicType<>(Short.class); + public static final BasicType CHARACTER = new BasicType<>(Character.class); + public static final BasicType BIG_INTEGER = new BasicType<>(BigInteger.class); + public static final BasicType BIG_DECIMAL = new BasicType<>(BigDecimal.class); + public static final BasicType INSTANT = new BasicType<>(Instant.class); public static final BasicType NULL = new BasicType<>(Void.class); - private final Class typeClass; + /** + * The physical type class. + */ + private final Class physicalTypeClass; - public BasicType(Class typeClass) { - if (typeClass == null) { - throw new IllegalArgumentException("typeClass cannot be null"); + public BasicType(Class physicalTypeClass) { + if (physicalTypeClass == null) { + throw new IllegalArgumentException("physicalTypeClass cannot be null"); } - this.typeClass = typeClass; + this.physicalTypeClass = physicalTypeClass; + } + + public Class getPhysicalTypeClass() { + return this.physicalTypeClass; + } + + @Override + public int hashCode() { + return this.physicalTypeClass.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + BasicType other = (BasicType) obj; + return this.physicalTypeClass.equals(other.physicalTypeClass); } @Override - public Class getTypeClass() { - return this.typeClass; + public String toString() { + return "BasicType{" + + "physicalTypeClass=" + physicalTypeClass + + '}'; } } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java index 0c65f471013..2183ad18134 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java @@ -18,15 +18,9 @@ package org.apache.seatunnel.api.table.type; /** - * Data type of column in SeaTunnel. + * Logic data type of column in SeaTunnel. */ public interface DataType { - /** - * The type class. - * - * @return the type class. - */ - Class getTypeClass(); } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/EnumType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/EnumType.java new file mode 100644 index 00000000000..9b4f851744a --- /dev/null +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/EnumType.java @@ -0,0 +1,13 @@ +package org.apache.seatunnel.api.table.type; + +public class EnumType> implements DataType { + private final Class enumClass; + + public EnumType(Class enumClass) { + this.enumClass = enumClass; + } + + public Class getEnumClass() { + return enumClass; + } +} diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ListType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ListType.java index 54ccad0b4bf..41c570047f9 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ListType.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ListType.java @@ -19,15 +19,13 @@ public class ListType implements DataType { - // todo: use DataType? - private final Class typeClass; + private final DataType elementType; - public ListType(Class typeClass) { - this.typeClass = typeClass; + public ListType(DataType elementType) { + this.elementType = elementType; } - @Override - public Class getTypeClass() { - return typeClass; + public DataType getElementType() { + return elementType; } } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MapType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MapType.java new file mode 100644 index 00000000000..6c8c24b78de --- /dev/null +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MapType.java @@ -0,0 +1,29 @@ +package org.apache.seatunnel.api.table.type; + +import java.util.Map; + +public class MapType implements DataType> { + + private final DataType keyType; + private final DataType valueType; + + public MapType(DataType keyType, DataType valueType) { + if (keyType == null) { + throw new IllegalArgumentException("keyType cannot be null"); + } + if (valueType == null) { + throw new IllegalArgumentException("valueType cannot be null"); + } + this.keyType = keyType; + this.valueType = valueType; + } + + public DataType getKeyType() { + return keyType; + } + + public DataType getValueType() { + return valueType; + } + +} diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/PojoType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/PojoType.java index e70fcd62d02..2892d011b43 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/PojoType.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/PojoType.java @@ -17,18 +17,29 @@ package org.apache.seatunnel.api.table.type; -// todo: we may don't need to pojo type +import java.lang.reflect.Field; + public class PojoType implements DataType { - private final Class typeClass; + private final Class pojoClass; + private final Field[] fields; + private final DataType[] fieldTypes; + + public PojoType(Class pojoClass, Field[] fields, DataType[] fieldTypes) { + this.pojoClass = pojoClass; + this.fields = fields; + this.fieldTypes = fieldTypes; + } - public PojoType(Class typeClass) { - this.typeClass = typeClass; + public Class getPojoClass() { + return pojoClass; + } + public Field[] getFields() { + return fields; } - @Override - public Class getTypeClass() { - return typeClass; + public DataType[] getFieldTypes() { + return fieldTypes; } } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/TimestampType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/TimestampType.java new file mode 100644 index 00000000000..390ce4533ac --- /dev/null +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/TimestampType.java @@ -0,0 +1,16 @@ +package org.apache.seatunnel.api.table.type; + +import java.sql.Timestamp; + +public class TimestampType implements DataType { + + private final int precision; + + public TimestampType(int precision) { + this.precision = precision; + } + + public int getPrecision() { + return precision; + } +} diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/ArrayTypeConverter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/ArrayTypeConverter.java new file mode 100644 index 00000000000..d4aa1866c75 --- /dev/null +++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/ArrayTypeConverter.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.translation.flink.types; + +import org.apache.seatunnel.api.table.type.ArrayType; +import org.apache.seatunnel.api.table.type.BasicType; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; + +public class ArrayTypeConverter implements FlinkTypeConverter, BasicArrayTypeInfo> { + + @Override + @SuppressWarnings("unchecked") + public BasicArrayTypeInfo convert(ArrayType arrayType) { + BasicType elementType = arrayType.getElementType(); + if (BasicType.BOOLEAN.equals(elementType)) { + return (BasicArrayTypeInfo) BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO; + } + if (BasicType.STRING.equals(elementType)) { + return (BasicArrayTypeInfo) BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO; + } + if (BasicType.DOUBLE.equals(elementType)) { + return (BasicArrayTypeInfo) BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO; + } + if (BasicType.INTEGER.equals(elementType)) { + return (BasicArrayTypeInfo) BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO; + } + if (BasicType.LONG.equals(elementType)) { + return (BasicArrayTypeInfo) BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO; + } + if (BasicType.FLOAT.equals(elementType)) { + return (BasicArrayTypeInfo) BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO; + } + if (BasicType.BYTE.equals(elementType)) { + return (BasicArrayTypeInfo) BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO; + } + if (BasicType.SHORT.equals(elementType)) { + return (BasicArrayTypeInfo) BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO; + } + if (BasicType.CHARACTER.equals(elementType)) { + return (BasicArrayTypeInfo) BasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO; + } + throw new IllegalArgumentException("Unsupported basic type: " + elementType); + } +} diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverter.java index 7a2ee1ff512..ed8967f89c9 100644 --- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverter.java +++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverter.java @@ -18,70 +18,98 @@ package org.apache.seatunnel.translation.flink.types; import org.apache.seatunnel.api.table.type.BasicType; -import org.apache.seatunnel.api.table.type.DataType; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.time.Instant; import java.util.Date; -public class BasicTypeConverter implements FlinkTypeConverter { +public class BasicTypeConverter + implements FlinkTypeConverter, TypeInformation> { - public static final BasicTypeConverter STRING_CONVERTER = + public static final BasicTypeConverter STRING_CONVERTER = new BasicTypeConverter<>( BasicType.STRING, BasicTypeInfo.STRING_TYPE_INFO); - public static final BasicTypeConverter INTEGER_CONVERTER = + public static final BasicTypeConverter INTEGER_CONVERTER = new BasicTypeConverter<>( BasicType.INTEGER, BasicTypeInfo.INT_TYPE_INFO); - public static final BasicTypeConverter BOOLEAN_CONVERTER = + public static final BasicTypeConverter BOOLEAN_CONVERTER = new BasicTypeConverter<>( BasicType.BOOLEAN, BasicTypeInfo.BOOLEAN_TYPE_INFO); - public static final BasicTypeConverter DATE_CONVERTER = + public static final BasicTypeConverter DATE_CONVERTER = new BasicTypeConverter<>( BasicType.DATE, BasicTypeInfo.DATE_TYPE_INFO); - public static final BasicTypeConverter DOUBLE_CONVERTER = + public static final BasicTypeConverter DOUBLE_CONVERTER = new BasicTypeConverter<>( BasicType.DOUBLE, BasicTypeInfo.DOUBLE_TYPE_INFO); - public static final BasicTypeConverter LONG_CONVERTER = + public static final BasicTypeConverter LONG_CONVERTER = new BasicTypeConverter<>( BasicType.LONG, BasicTypeInfo.LONG_TYPE_INFO); - public static final BasicTypeConverter FLOAT_CONVERTER = + public static final BasicTypeConverter FLOAT_CONVERTER = new BasicTypeConverter<>( BasicType.FLOAT, BasicTypeInfo.FLOAT_TYPE_INFO); - public static final BasicTypeConverter BYTE_CONVERTER = + public static final BasicTypeConverter BYTE_CONVERTER = new BasicTypeConverter<>( BasicType.BYTE, BasicTypeInfo.BYTE_TYPE_INFO); - public static final BasicTypeConverter NULL_CONVERTER = + public static final BasicTypeConverter SHORT_CONVERTER = + new BasicTypeConverter<>( + BasicType.SHORT, + BasicTypeInfo.SHORT_TYPE_INFO); + + public static final BasicTypeConverter CHARACTER_CONVERTER = + new BasicTypeConverter<>( + BasicType.CHARACTER, + BasicTypeInfo.CHAR_TYPE_INFO); + + public static final BasicTypeConverter BIG_INTEGER_CONVERTER = + new BasicTypeConverter<>( + BasicType.BIG_INTEGER, + BasicTypeInfo.BIG_INT_TYPE_INFO); + + public static final BasicTypeConverter BIG_DECIMAL = + new BasicTypeConverter<>( + BasicType.BIG_DECIMAL, + BasicTypeInfo.BIG_DEC_TYPE_INFO); + + public static final BasicTypeConverter INSTANT_CONVERTER = + new BasicTypeConverter<>( + BasicType.INSTANT, + BasicTypeInfo.INSTANT_TYPE_INFO); + + public static final BasicTypeConverter NULL_CONVERTER = new BasicTypeConverter<>( BasicType.NULL, BasicTypeInfo.VOID_TYPE_INFO); - private final DataType dataType; - private final TypeInformation typeInformation; + private final BasicType seaTunnelDataType; + private final TypeInformation flinkTypeInformation; - public BasicTypeConverter(DataType dataType, TypeInformation typeInformation) { - this.dataType = dataType; - this.typeInformation = typeInformation; + public BasicTypeConverter(BasicType seaTunnelDataType, TypeInformation flinkTypeInformation) { + this.seaTunnelDataType = seaTunnelDataType; + this.flinkTypeInformation = flinkTypeInformation; } @Override - public TypeInformation convert(DataType dataType) { - return typeInformation; + public TypeInformation convert(BasicType seaTunnelDataType) { + return flinkTypeInformation; } } diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java index 78e6d5abb88..f5912f3645e 100644 --- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java +++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java @@ -25,16 +25,15 @@ /** * Convert SeaTunnel {@link DataType} to flink type. */ -public interface FlinkTypeConverter - extends Converter, TypeInformation> { +public interface FlinkTypeConverter extends Converter { /** * Convert SeaTunnel {@link DataType} to flink {@link TypeInformation}. * - * @param dataType SeaTunnel {@link DataType} + * @param seaTunnelDataType SeaTunnel {@link DataType} * @return flink {@link TypeInformation} */ @Override - TypeInformation convert(DataType dataType); + T2 convert(T1 seaTunnelDataType); } diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/PojoTypeConverter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/PojoTypeConverter.java new file mode 100644 index 00000000000..efd6e713e81 --- /dev/null +++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/PojoTypeConverter.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.translation.flink.types; + +import org.apache.seatunnel.api.table.type.DataType; +import org.apache.seatunnel.api.table.type.PojoType; +import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils; + +import org.apache.commons.lang3.ArrayUtils; +import org.apache.flink.api.java.typeutils.PojoField; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class PojoTypeConverter implements FlinkTypeConverter, PojoTypeInfo> { + + @Override + public PojoTypeInfo convert(PojoType seaTunnelDataType) { + Class pojoClass = seaTunnelDataType.getPojoClass(); + Field[] fields = seaTunnelDataType.getFields(); + DataType[] fieldTypes = seaTunnelDataType.getFieldTypes(); + if (ArrayUtils.isEmpty(fields)) { + return new PojoTypeInfo<>(pojoClass, Collections.emptyList()); + } + List pojoFieldList = new ArrayList<>(fields.length); + for (int i = 0; i < fields.length; i++) { + PojoField pojoField = new PojoField(fields[i], TypeConverterUtils.convertType(fieldTypes[i])); + pojoFieldList.add(pojoField); + } + return new PojoTypeInfo<>(pojoClass, pojoFieldList); + } +} diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DateTimeType.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/TimestampTypeConverter.java similarity index 59% rename from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DateTimeType.java rename to seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/TimestampTypeConverter.java index 0dfdacf4c75..b8bf1be7c34 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DateTimeType.java +++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/TimestampTypeConverter.java @@ -15,16 +15,22 @@ * limitations under the License. */ -package org.apache.seatunnel.api.table.type; +package org.apache.seatunnel.translation.flink.types; -public class DateTimeType implements DataType { +import org.apache.seatunnel.api.table.type.TimestampType; - public DateTimeType() { +import org.apache.flink.table.runtime.typeutils.TimestampDataTypeInfo; + +public class TimestampTypeConverter implements FlinkTypeConverter { + + public static final TimestampTypeConverter INSTANCE = new TimestampTypeConverter(); + + private TimestampTypeConverter() { } @Override - public Class getTypeClass() { - return null; + public TimestampDataTypeInfo convert(TimestampType seaTunnelDataType) { + return new TimestampDataTypeInfo(seaTunnelDataType.getPrecision()); } } diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java new file mode 100644 index 00000000000..ae155306a9a --- /dev/null +++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.translation.flink.utils; + +import org.apache.seatunnel.api.table.type.ArrayType; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DataType; +import org.apache.seatunnel.api.table.type.EnumType; +import org.apache.seatunnel.api.table.type.ListType; +import org.apache.seatunnel.api.table.type.MapType; +import org.apache.seatunnel.api.table.type.PojoType; +import org.apache.seatunnel.api.table.type.TimestampType; +import org.apache.seatunnel.translation.flink.types.ArrayTypeConverter; +import org.apache.seatunnel.translation.flink.types.BasicTypeConverter; +import org.apache.seatunnel.translation.flink.types.PojoTypeConverter; +import org.apache.seatunnel.translation.flink.types.TimestampTypeConverter; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.EnumTypeInfo; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.Date; + +public class TypeConverterUtils { + + @SuppressWarnings("unchecked") + public static TypeInformation convertType(DataType dataType) { + if (dataType instanceof BasicType) { + return (TypeInformation) convertBasicType((BasicType) dataType); + } + if (dataType instanceof TimestampType) { + return (TypeInformation) + TimestampTypeConverter.INSTANCE.convert((TimestampType) dataType); + } + if (dataType instanceof ArrayType) { + return (TypeInformation) convertArrayType((ArrayType) dataType); + } + if (dataType instanceof ListType) { + return (TypeInformation) covertListType((ListType) dataType); + } + if (dataType instanceof EnumType) { + + return (TypeInformation) coverEnumType((EnumType) dataType); + } + if (dataType instanceof MapType) { + return (TypeInformation) convertMapType((MapType) dataType); + } + if (dataType instanceof PojoType) { + return (TypeInformation) convertPojoType((PojoType) dataType); + } + throw new IllegalArgumentException("Unsupported data type: " + dataType); + } + + @SuppressWarnings("unchecked") + public static TypeInformation convertBasicType(BasicType basicType) { + Class physicalTypeClass = basicType.getPhysicalTypeClass(); + if (physicalTypeClass == Boolean.class) { + BasicType booleanBasicType = (BasicType) basicType; + return (TypeInformation) + BasicTypeConverter.BOOLEAN_CONVERTER.convert(booleanBasicType); + } + if (physicalTypeClass == String.class) { + BasicType stringBasicType = (BasicType) basicType; + return (TypeInformation) + BasicTypeConverter.STRING_CONVERTER.convert(stringBasicType); + } + if (physicalTypeClass == Date.class) { + BasicType dateBasicType = (BasicType) basicType; + return (TypeInformation) + BasicTypeConverter.DATE_CONVERTER.convert(dateBasicType); + } + if (physicalTypeClass == Double.class) { + BasicType doubleBasicType = (BasicType) basicType; + return (TypeInformation) + BasicTypeConverter.DOUBLE_CONVERTER.convert(doubleBasicType); + } + if (physicalTypeClass == Integer.class) { + BasicType integerBasicType = (BasicType) basicType; + return (TypeInformation) + BasicTypeConverter.INTEGER_CONVERTER.convert(integerBasicType); + } + if (physicalTypeClass == Long.class) { + BasicType longBasicType = (BasicType) basicType; + return (TypeInformation) + BasicTypeConverter.LONG_CONVERTER.convert(longBasicType); + } + if (physicalTypeClass == Float.class) { + BasicType floatBasicType = (BasicType) basicType; + return (TypeInformation) + BasicTypeConverter.FLOAT_CONVERTER.convert(floatBasicType); + } + if (physicalTypeClass == Byte.class) { + BasicType byteBasicType = (BasicType) basicType; + return (TypeInformation) + BasicTypeConverter.BYTE_CONVERTER.convert(byteBasicType); + } + if (physicalTypeClass == Short.class) { + BasicType shortBasicType = (BasicType) basicType; + return (TypeInformation) + BasicTypeConverter.SHORT_CONVERTER.convert(shortBasicType); + } + if (physicalTypeClass == Character.class) { + BasicType characterBasicType = (BasicType) basicType; + return (TypeInformation) + BasicTypeConverter.CHARACTER_CONVERTER.convert(characterBasicType); + } + if (physicalTypeClass == BigInteger.class) { + BasicType bigIntegerBasicType = (BasicType) basicType; + return (TypeInformation) + BasicTypeConverter.BIG_INTEGER_CONVERTER.convert(bigIntegerBasicType); + } + if (physicalTypeClass == BigDecimal.class) { + BasicType bigDecimalBasicType = (BasicType) basicType; + return (TypeInformation) + BasicTypeConverter.BIG_DECIMAL.convert(bigDecimalBasicType); + } + if (physicalTypeClass == Void.class) { + BasicType voidBasicType = (BasicType) basicType; + return (TypeInformation) + BasicTypeConverter.NULL_CONVERTER.convert(voidBasicType); + } + throw new IllegalArgumentException("Unsupported basic type: " + basicType); + } + + public static BasicArrayTypeInfo convertArrayType(ArrayType arrayType) { + ArrayTypeConverter arrayTypeConverter = new ArrayTypeConverter<>(); + return arrayTypeConverter.convert(arrayType); + } + + public static ListTypeInfo covertListType(ListType listType) { + DataType elementType = listType.getElementType(); + return new ListTypeInfo<>(convertType(elementType)); + } + + public static > EnumTypeInfo coverEnumType(EnumType enumType) { + Class enumClass = enumType.getEnumClass(); + return new EnumTypeInfo<>(enumClass); + } + + public static MapTypeInfo convertMapType(MapType mapType) { + DataType keyType = mapType.getKeyType(); + DataType valueType = mapType.getValueType(); + return new MapTypeInfo<>(convertType(keyType), convertType(valueType)); + } + + public static PojoTypeInfo convertPojoType(PojoType pojoType) { + PojoTypeConverter pojoTypeConverter = new PojoTypeConverter<>(); + return pojoTypeConverter.convert(pojoType); + } +}