Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add complex seatunnel datatype #1807

Merged
merged 3 commits into from
May 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ public abstract class Column {
/**
* Data type of the column.
*/
protected final DataType dataType;
protected final DataType<?> dataType;

protected final String comment;

private Column(String name, DataType dataType, String comment) {
private Column(String name, DataType<?> dataType, String comment) {
this.name = name;
this.dataType = dataType;
this.comment = comment;
Expand All @@ -47,7 +47,7 @@ private Column(String name, DataType dataType, String comment) {
/**
* Creates a regular table column that represents physical data.
*/
public static PhysicalColumn physical(String name, DataType dataType) {
public static PhysicalColumn physical(String name, DataType<?> dataType) {
return new PhysicalColumn(name, dataType);
}

Expand All @@ -58,7 +58,7 @@ public static PhysicalColumn physical(String name, DataType dataType) {
* <p>Allows to specify whether the column is virtual or not.
*/
public static MetadataColumn metadata(
String name, DataType dataType, String metadataKey) {
String name, DataType<?> dataType, String metadataKey) {
return new MetadataColumn(name, dataType, metadataKey);
}

Expand All @@ -76,7 +76,7 @@ public static MetadataColumn metadata(
/**
* Returns the data type of this column.
*/
public DataType getDataType() {
public DataType<?> getDataType() {
return this.dataType;
}

Expand All @@ -97,7 +97,7 @@ public Optional<String> getComment() {
/**
* Returns a copy of the column with a replaced {@link DataType}.
*/
public abstract Column copy(DataType newType);
public abstract Column copy(DataType<?> newType);

@Override
public boolean equals(Object o) {
Expand Down Expand Up @@ -127,11 +127,11 @@ public int hashCode() {
*/
public static final class PhysicalColumn extends Column {

private PhysicalColumn(String name, DataType dataType) {
private PhysicalColumn(String name, DataType<?> dataType) {
this(name, dataType, null);
}

private PhysicalColumn(String name, DataType dataType, String comment) {
private PhysicalColumn(String name, DataType<?> dataType, String comment) {
super(name, dataType, comment);
}

Expand All @@ -149,7 +149,7 @@ public boolean isPhysical() {
}

@Override
public Column copy(DataType newDataType) {
public Column copy(DataType<?> newDataType) {
return new PhysicalColumn(name, newDataType, comment);
}
}
Expand All @@ -162,13 +162,13 @@ public static final class MetadataColumn extends Column {
private final String metadataKey;

private MetadataColumn(
String name, DataType dataType, String metadataKey) {
String name, DataType<?> dataType, String metadataKey) {
this(name, dataType, metadataKey, null);
}

private MetadataColumn(
String name,
DataType dataType,
DataType<?> dataType,
String metadataKey,
String comment) {
super(name, dataType, comment);
Expand All @@ -193,7 +193,7 @@ public boolean isPhysical() {
}

@Override
public Column copy(DataType newDataType) {
public Column copy(DataType<?> newDataType) {
return new MetadataColumn(name, newDataType, metadataKey, comment);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
*/
public interface SupportReadingMetadata {

Map<String, DataType> listReadableMetadata(CatalogTable catalogTable);
Map<String, DataType<?>> listReadableMetadata(CatalogTable catalogTable);

void applyReadableMetadata(CatalogTable catalogTable, List<String> metadataKeys, DataType dataType);
void applyReadableMetadata(CatalogTable catalogTable, List<String> metadataKeys, DataType<?> dataType);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.apache.seatunnel.api.table.type;

public class ArrayType<T> implements DataType<T> {

private final BasicType<T> elementType;

public ArrayType(BasicType<T> elementType) {
this.elementType = elementType;
}

public BasicType<T> getElementType() {
return elementType;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.seatunnel.api.table.type;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.time.Instant;
import java.util.Date;

public class BasicType<T> implements DataType<T> {
Expand All @@ -29,19 +32,50 @@ public class BasicType<T> implements DataType<T> {
public static final BasicType<Long> LONG = new BasicType<>(Long.class);
public static final BasicType<Float> FLOAT = new BasicType<>(Float.class);
public static final BasicType<Byte> BYTE = new BasicType<>(Byte.class);
public static final BasicType<Short> SHORT = new BasicType<>(Short.class);
public static final BasicType<Character> CHARACTER = new BasicType<>(Character.class);
public static final BasicType<BigInteger> BIG_INTEGER = new BasicType<>(BigInteger.class);
public static final BasicType<BigDecimal> BIG_DECIMAL = new BasicType<>(BigDecimal.class);
public static final BasicType<Instant> INSTANT = new BasicType<>(Instant.class);
public static final BasicType<Void> NULL = new BasicType<>(Void.class);

private final Class<T> typeClass;
/**
* The physical type class.
*/
private final Class<T> physicalTypeClass;

public BasicType(Class<T> typeClass) {
if (typeClass == null) {
throw new IllegalArgumentException("typeClass cannot be null");
public BasicType(Class<T> physicalTypeClass) {
if (physicalTypeClass == null) {
throw new IllegalArgumentException("physicalTypeClass cannot be null");
}
this.typeClass = typeClass;
this.physicalTypeClass = physicalTypeClass;
}

public Class<T> getPhysicalTypeClass() {
return this.physicalTypeClass;
}

@Override
public int hashCode() {
return this.physicalTypeClass.hashCode();
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
BasicType<?> other = (BasicType<?>) obj;
return this.physicalTypeClass.equals(other.physicalTypeClass);
}

@Override
public Class<T> getTypeClass() {
return this.typeClass;
public String toString() {
return "BasicType{" +
"physicalTypeClass=" + physicalTypeClass +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,9 @@
package org.apache.seatunnel.api.table.type;

/**
* Data type of column in SeaTunnel.
* Logic data type of column in SeaTunnel.
*/
public interface DataType<T> {

/**
* The type class.
*
* @return the type class.
*/
Class<T> getTypeClass();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.apache.seatunnel.api.table.type;

public class EnumType<T extends Enum<T>> implements DataType<T> {
private final Class<T> enumClass;

public EnumType(Class<T> enumClass) {
this.enumClass = enumClass;
}

public Class<T> getEnumClass() {
return enumClass;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@

public class ListType<T> implements DataType<T> {

// todo: use DataType?
private final Class<T> typeClass;
private final DataType<T> elementType;

public ListType(Class<T> typeClass) {
this.typeClass = typeClass;
public ListType(DataType<T> elementType) {
this.elementType = elementType;
}

@Override
public Class<T> getTypeClass() {
return typeClass;
public DataType<T> getElementType() {
return elementType;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.apache.seatunnel.api.table.type;

import java.util.Map;

public class MapType<K, V> implements DataType<Map<K, V>> {

private final DataType<K> keyType;
private final DataType<V> valueType;

public MapType(DataType<K> keyType, DataType<V> valueType) {
if (keyType == null) {
throw new IllegalArgumentException("keyType cannot be null");
}
if (valueType == null) {
throw new IllegalArgumentException("valueType cannot be null");
}
this.keyType = keyType;
this.valueType = valueType;
}

public DataType<K> getKeyType() {
return keyType;
}

public DataType<V> getValueType() {
return valueType;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,29 @@

package org.apache.seatunnel.api.table.type;

// todo: we may don't need to pojo type
import java.lang.reflect.Field;

public class PojoType<T> implements DataType<T> {

private final Class<T> typeClass;
private final Class<T> pojoClass;
private final Field[] fields;
private final DataType<?>[] fieldTypes;

public PojoType(Class<T> pojoClass, Field[] fields, DataType<?>[] fieldTypes) {
this.pojoClass = pojoClass;
this.fields = fields;
this.fieldTypes = fieldTypes;
}

public PojoType(Class<T> typeClass) {
this.typeClass = typeClass;
public Class<T> getPojoClass() {
return pojoClass;
}

public Field[] getFields() {
return fields;
}

@Override
public Class<T> getTypeClass() {
return typeClass;
public DataType<?>[] getFieldTypes() {
return fieldTypes;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.apache.seatunnel.api.table.type;

import java.sql.Timestamp;

public class TimestampType implements DataType<Timestamp> {

private final int precision;

public TimestampType(int precision) {
this.precision = precision;
}

public int getPrecision() {
return precision;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.translation.flink.types;

import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;

import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;

public class ArrayTypeConverter<T1, T2> implements FlinkTypeConverter<ArrayType<T1>, BasicArrayTypeInfo<T1, T2>> {

@Override
@SuppressWarnings("unchecked")
public BasicArrayTypeInfo<T1, T2> convert(ArrayType<T1> arrayType) {
BasicType<T1> elementType = arrayType.getElementType();
if (BasicType.BOOLEAN.equals(elementType)) {
return (BasicArrayTypeInfo<T1, T2>) BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO;
}
if (BasicType.STRING.equals(elementType)) {
return (BasicArrayTypeInfo<T1, T2>) BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO;
}
if (BasicType.DOUBLE.equals(elementType)) {
return (BasicArrayTypeInfo<T1, T2>) BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO;
}
if (BasicType.INTEGER.equals(elementType)) {
return (BasicArrayTypeInfo<T1, T2>) BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO;
}
if (BasicType.LONG.equals(elementType)) {
return (BasicArrayTypeInfo<T1, T2>) BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO;
}
if (BasicType.FLOAT.equals(elementType)) {
return (BasicArrayTypeInfo<T1, T2>) BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO;
}
if (BasicType.BYTE.equals(elementType)) {
return (BasicArrayTypeInfo<T1, T2>) BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO;
}
if (BasicType.SHORT.equals(elementType)) {
return (BasicArrayTypeInfo<T1, T2>) BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO;
}
if (BasicType.CHARACTER.equals(elementType)) {
return (BasicArrayTypeInfo<T1, T2>) BasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO;
}
throw new IllegalArgumentException("Unsupported basic type: " + elementType);
}
}
Loading