Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include more datatypes for avro schema #811

Merged
merged 5 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.datasqrl.calcite.type;

import static org.apache.calcite.sql.type.SqlTypeName.DECIMAL;
import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTypeFactory;

import javax.annotation.Nullable;
import org.apache.calcite.rel.type.RelDataType;
Expand All @@ -9,20 +10,24 @@
import org.apache.calcite.rel.type.RelDataTypeSystemImpl;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.type.SqlTypeUtil;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.calcite.FlinkTypeSystem;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.utils.LogicalTypeMerging;
import org.apache.flink.util.function.QuadFunction;


//TODO: This is a copy of org.apache.flink.table.planner.calcite.FlinkTypeSystem, we'll port to sqrl types soon
public class SqrlTypeSystem extends RelDataTypeSystemImpl {
public static final SqrlTypeSystem INSTANCE =
new SqrlTypeSystem();
public static final SqrlTypeSystem INSTANCE = new SqrlTypeSystem();
public static final DecimalType DECIMAL_SYSTEM_DEFAULT =
new DecimalType(DecimalType.MAX_PRECISION, 18);

private SqrlTypeSystem() {}

@Override
public int getMaxNumericPrecision() {
// set the maximum precision of a NUMERIC or DECIMAL type to DecimalType.MAX_PRECISION.
Expand All @@ -41,7 +46,7 @@ public int getDefaultPrecision(SqlTypeName typeName) {
case VARCHAR:
case VARBINARY:
// Calcite will limit the length of the VARCHAR field to 65536
return Integer.MAX_VALUE;//Integer.MAX_VALUE;
return Integer.MAX_VALUE;
case TIMESTAMP:
// by default we support timestamp with microseconds precision (Timestamp(6))
return TimestampType.DEFAULT_PRECISION;
Expand All @@ -60,7 +65,7 @@ public int getMaxPrecision(SqlTypeName typeName) {
case CHAR:
case VARBINARY:
case BINARY:
return Integer.MAX_VALUE;//Integer.MAX_VALUE;
return Integer.MAX_VALUE;

case TIMESTAMP:
// The maximum precision of TIMESTAMP is 3 in Calcite,
Expand All @@ -83,6 +88,21 @@ public boolean shouldConvertRaggedUnionTypesToVarying() {
return true;
}

@Override
public RelDataType deriveAvgAggType(
RelDataTypeFactory typeFactory, RelDataType argRelDataType) {
LogicalType argType = FlinkTypeFactory.toLogicalType(argRelDataType);
LogicalType resultType = LogicalTypeMerging.findAvgAggType(argType);
return unwrapTypeFactory(typeFactory).createFieldTypeFromLogicalType(resultType);
}

@Override
public RelDataType deriveSumType(RelDataTypeFactory typeFactory, RelDataType argRelDataType) {
LogicalType argType = FlinkTypeFactory.toLogicalType(argRelDataType);
LogicalType resultType = LogicalTypeMerging.findSumAggType(argType);
return unwrapTypeFactory(typeFactory).createFieldTypeFromLogicalType(resultType);
}

@Override
public RelDataType deriveDecimalPlusType(
RelDataTypeFactory typeFactory, RelDataType type1, RelDataType type2) {
Expand Down Expand Up @@ -171,5 +191,4 @@ private boolean canDeriveDecimal(RelDataType type1, RelDataType type2) {
&& SqlTypeUtil.isExactNumeric(type2)
&& (SqlTypeUtil.isDecimal(type1) || SqlTypeUtil.isDecimal(type2));
}

}
4 changes: 4 additions & 0 deletions sqrl-planner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
</dependency>

</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
package com.datasqrl.io.schema.avro;

import com.datasqrl.canonicalizer.Name;
import com.datasqrl.config.TableConfig;
import com.datasqrl.error.ErrorCollector;
import com.datasqrl.io.schema.flexible.converters.SchemaToRelDataTypeFactory;
import com.datasqrl.io.tables.TableSchema;
import com.google.auto.service.AutoService;
import com.google.common.base.Preconditions;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.calcite.rel.type.RelDataType;

@AutoService(SchemaToRelDataTypeFactory.class)
@Slf4j
public class AvroSchemaToRelDataTypeFactory implements SchemaToRelDataTypeFactory {

@Override
Expand All @@ -18,12 +22,25 @@ public String getSchemaType() {
}

@Override
public RelDataType map(TableSchema schema, Name tableName, ErrorCollector errors) {
public RelDataType map(TableSchema schema, TableConfig tableConfig, Name tableName, ErrorCollector errors) {
Preconditions.checkArgument(schema instanceof AvroSchemaHolder);
Schema avroSchema = ((AvroSchemaHolder)schema).getSchema();
AvroToRelDataTypeConverter converter = new AvroToRelDataTypeConverter(errors);

boolean legacyTimestampMapping = getLegacyTimestampMapping(tableConfig);
AvroToRelDataTypeConverter converter = new AvroToRelDataTypeConverter(errors, legacyTimestampMapping);
return converter.convert(avroSchema);
}

private boolean getLegacyTimestampMapping(TableConfig tableConfig) {
if (tableConfig == null) return true;
Map<String, Object> configMap = tableConfig.getConnectorConfig().toMap();
Object legacy = configMap.get("timestamp_mapping.legacy");
if (legacy instanceof Boolean) {
return (Boolean) legacy;
} else if (legacy != null) {
log.warn("Expected boolean type for 'timestamp_mapping.legacy'");
}

return true; //default value
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,135 +5,100 @@
import com.datasqrl.canonicalizer.NamePath;
import com.datasqrl.error.ErrorCode;
import com.datasqrl.error.ErrorCollector;
import com.datasqrl.util.StreamUtil;
import java.util.ArrayList;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import lombok.AllArgsConstructor;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
import org.apache.flink.table.types.DataType;

@AllArgsConstructor
public class AvroToRelDataTypeConverter {

private final TypeFactory typeFactory;
private final ErrorCollector errors;
private final Set<Schema> processedSchemas;
private final boolean legacyTimestampMapping;

public AvroToRelDataTypeConverter(ErrorCollector errors) {
this(TypeFactory.getTypeFactory(), errors);
public AvroToRelDataTypeConverter(ErrorCollector errors, boolean legacyTimestampMapping) {
this(errors, Collections.newSetFromMap(new IdentityHashMap<>()), legacyTimestampMapping);
}

public RelDataType convert(Schema schema) {
return convertAvroSchemaToCalciteType(schema, NamePath.ROOT);
validateSchema(schema, NamePath.ROOT);

DataType dataType = AvroSchemaConverter.convertToDataType(schema.toString(false),
legacyTimestampMapping);

TypeFactory typeFactory = TypeFactory.getTypeFactory();

return typeFactory.createFieldTypeFromLogicalType(
dataType.getLogicalType());
}

private RelDataType convertAvroSchemaToCalciteType(Schema schema, NamePath path) {
RelDataType relType;
private void validateSchema(Schema schema, NamePath path) {
// Check if the schema has already been processed
if (!processedSchemas.add(schema)) {
throw errors.exception(ErrorCode.SCHEMA_ERROR, "Recursive schemas are not supported: %s", path);
}

switch (schema.getType()) {
case UNION:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make the code a bit cleaner and DRYer, use the new java switch expression, i.e. something like

}
typeCache.add(schema,relType);
return relType;

boolean containsNull = schema.getTypes().stream().anyMatch(type -> type.getType().equals(Type.NULL));
Optional<Schema> innerType;
try {
innerType = StreamUtil.getOnlyElement(
schema.getTypes().stream().filter(type -> !type.getType().equals(Type.NULL)));
} catch (IllegalArgumentException e) {
errors.fatal(ErrorCode.SCHEMA_ERROR, "Only AVRO unions with a single non-null type are supported, but found multiple types at: %",path);
return null;
List<Schema> nonNullTypes = new ArrayList<>();
for (Schema memberSchema : schema.getTypes()) {
if (memberSchema.getType() != Type.NULL) {
nonNullTypes.add(memberSchema);
}
}
if (innerType.isEmpty()) {
errors.fatal(ErrorCode.SCHEMA_ERROR, "Only AVRO unions with a single non-null type are supported, but found no types at: %",path);
return null;

if (nonNullTypes.size() != 1) {
throw errors.exception(ErrorCode.SCHEMA_ERROR,
"Only AVRO unions with a single non-null type are supported, but found %d non-null types at: %s",
nonNullTypes.size(), path);
}
relType = convertAvroSchemaToCalciteType(innerType.get(), path);
if (relType==null) return null;
if (containsNull) relType = typeFactory.createTypeWithNullability(relType, true);
return relType;

Schema innerSchema = nonNullTypes.get(0);
validateSchema(innerSchema, path);
break;
case RECORD:
List<String> fieldNames = new ArrayList<>();
List<RelDataType> fieldTypes = new ArrayList<>();
for (Field field : schema.getFields()) {
relType = convertAvroSchemaToCalciteType(field.schema(), path.concat(
Name.system(field.name())));
if (relType!=null) {
fieldNames.add(field.name());
fieldTypes.add(relType);
}
validateSchema(field.schema(),
path.concat(Name.system(field.name())));
}
if (fieldTypes.isEmpty()) {
errors.fatal(ErrorCode.SCHEMA_ERROR, "AVRO record does not contain any valid field at: %",path);
return null;
}
return notNull(typeFactory.createStructType(fieldTypes, fieldNames));
break;
case ARRAY:
relType = convertAvroSchemaToCalciteType(schema.getElementType(), path);
if (relType==null) return null;
return notNull(typeFactory.createArrayType(relType, -1));
validateSchema(schema.getElementType(), path);
break;
case MAP:
relType = convertAvroSchemaToCalciteType(schema.getValueType(), path);
if (relType==null) return null;
return notNull(typeFactory.createMapType(typeFactory.createSqlType(SqlTypeName.VARCHAR), relType));
default: //primitives
relType = getPrimitive(schema, path);
if (relType!=null) relType = notNull(relType);
return relType;
validateSchema(schema.getValueType(), path);
break;
default: // primitives
validatePrimitive(schema, path);
break;
}
}

private RelDataType getPrimitive(Schema schema, NamePath path) {
private void validatePrimitive(Schema schema, NamePath path) {
switch (schema.getType()) {
case FIXED:
if (logicalTypeEquals(schema, "decimal")) {
return notNull(typeFactory.createSqlType(SqlTypeName.DECIMAL));
} else {
errors.fatal(ErrorCode.SCHEMA_ERROR, "Unrecognized FIXED type in AVRO schema [%s] at: %s", schema, path);
return null;
}
case ENUM:
case STRING:
return typeFactory.createSqlType(SqlTypeName.VARCHAR);
case BYTES:
return typeFactory.createSqlType(SqlTypeName.VARBINARY);
case INT:
if (logicalTypeEquals(schema, "date")) {
return typeFactory.createSqlType(SqlTypeName.DATE);
} else if (logicalTypeEquals(schema, "time-millis")) {
return typeFactory.createSqlType(SqlTypeName.TIME);
} else {
return typeFactory.createSqlType(SqlTypeName.INTEGER);
}
case LONG:
if (logicalTypeEquals(schema, "timestamp-millis")) {
return TypeFactory.makeTimestampType(typeFactory);
} else {
return typeFactory.createSqlType(SqlTypeName.BIGINT);
}
case FLOAT:
return typeFactory.createSqlType(SqlTypeName.FLOAT);
case DOUBLE:
return typeFactory.createSqlType(SqlTypeName.DOUBLE);
case BOOLEAN:
return typeFactory.createSqlType(SqlTypeName.BOOLEAN);
case NULL:
errors.fatal(ErrorCode.SCHEMA_ERROR, "NULL not supported as type at: %s", path);
return null;
return;
default:
errors.fatal(ErrorCode.SCHEMA_ERROR, "Unrecognized AVRO Type [%s] at: %s", schema.getType(), path);
return null;
throw errors.exception(ErrorCode.SCHEMA_ERROR, "Unrecognized AVRO Type [%s] at: %s",
schema.getType(), path);
}
}

private static boolean logicalTypeEquals(Schema schema, String typeName) {
return schema.getLogicalType()!=null && schema.getLogicalType().getName().equalsIgnoreCase(typeName);
}


private RelDataType notNull(RelDataType type) {
return typeFactory.createTypeWithNullability(type, false);
}



}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package com.datasqrl.io.schema.flexible.converters;

import com.datasqrl.canonicalizer.Name;
import com.datasqrl.config.TableConfig;
import com.datasqrl.error.ErrorCollector;
import com.datasqrl.io.tables.TableSchema;
import com.datasqrl.io.schema.flexible.FlexibleTableConverter;
Expand All @@ -22,7 +23,7 @@ public String getSchemaType() {
}

@Override
public RelDataType map(TableSchema schema, Name tableName, ErrorCollector errors) {
public RelDataType map(TableSchema schema, TableConfig tableConfig, Name tableName, ErrorCollector errors) {
Preconditions.checkArgument(schema instanceof FlexibleTableSchemaHolder);
FlexibleTableSchemaHolder holder = (FlexibleTableSchemaHolder)schema;
FlexibleTableConverter converter = new FlexibleTableConverter(holder.getSchema(), tableName);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.datasqrl.io.schema.flexible.converters;

import com.datasqrl.canonicalizer.Name;
import com.datasqrl.config.TableConfig;
import com.datasqrl.error.ErrorCollector;
import com.datasqrl.io.tables.TableSchema;
import com.datasqrl.util.ServiceLoaderDiscovery;
Expand All @@ -10,7 +11,7 @@ public interface SchemaToRelDataTypeFactory {

String getSchemaType();

RelDataType map(TableSchema schema, Name tableName, ErrorCollector errors);
RelDataType map(TableSchema schema, TableConfig tableConfig, Name tableName, ErrorCollector errors);

static SchemaToRelDataTypeFactory load(TableSchema schema) {
return ServiceLoaderDiscovery.get(SchemaToRelDataTypeFactory.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public SourceTableDefinition sourceToTable(
dataType = ((RelDataTypeTableSchema) tableSchema).getRelDataType();
} else {
dataType = SchemaToRelDataTypeFactory.load(tableSchema)
.map(tableSchema, tableName, errors);
.map(tableSchema, tableConfig, tableName, errors);
}
if (dataType==null) {
throw errors.exception(ErrorCode.SCHEMA_ERROR, "Could not convert schema for table: %s", tableName);
Expand Down
Loading
Loading