From cb98ef9c767c9c4bf4909b7fc840a33f3e814b31 Mon Sep 17 00:00:00 2001 From: Daniel Henneberger Date: Fri, 13 Sep 2024 10:37:18 -0700 Subject: [PATCH 1/5] Include more datatypes for avro schema Signed-off-by: Daniel Henneberger --- .../datasqrl/calcite/type/SqrlTypeSystem.java | 29 +- .../avro/AvroToRelDataTypeConverter.java | 153 ++++--- .../avro/AvroToRelDataTypeConverterTest.java | 374 ++++++++++++++++++ 3 files changed, 492 insertions(+), 64 deletions(-) create mode 100644 sqrl-planner/src/test/java/com/datasqrl/io/schema/avro/AvroToRelDataTypeConverterTest.java diff --git a/sqrl-calcite/src/main/java/com/datasqrl/calcite/type/SqrlTypeSystem.java b/sqrl-calcite/src/main/java/com/datasqrl/calcite/type/SqrlTypeSystem.java index 1ed40a26f..24ab124d1 100644 --- a/sqrl-calcite/src/main/java/com/datasqrl/calcite/type/SqrlTypeSystem.java +++ b/sqrl-calcite/src/main/java/com/datasqrl/calcite/type/SqrlTypeSystem.java @@ -1,6 +1,7 @@ package com.datasqrl.calcite.type; import static org.apache.calcite.sql.type.SqlTypeName.DECIMAL; +import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTypeFactory; import javax.annotation.Nullable; import org.apache.calcite.rel.type.RelDataType; @@ -9,8 +10,11 @@ import org.apache.calcite.rel.type.RelDataTypeSystemImpl; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.sql.type.SqlTypeUtil; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.calcite.FlinkTypeSystem; import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.TimestampType; import org.apache.flink.table.types.logical.utils.LogicalTypeMerging; import org.apache.flink.util.function.QuadFunction; @@ -18,11 +22,12 @@ //TODO: This is a copy of org.apache.flink.table.planner.calcite.FlinkTypeSystem, we'll port to sqrl types soon public class SqrlTypeSystem extends RelDataTypeSystemImpl { - public static final SqrlTypeSystem INSTANCE = - new SqrlTypeSystem(); + public static final SqrlTypeSystem INSTANCE = new SqrlTypeSystem(); public static final DecimalType DECIMAL_SYSTEM_DEFAULT = new DecimalType(DecimalType.MAX_PRECISION, 18); + private SqrlTypeSystem() {} + @Override public int getMaxNumericPrecision() { // set the maximum precision of a NUMERIC or DECIMAL type to DecimalType.MAX_PRECISION. @@ -41,7 +46,7 @@ public int getDefaultPrecision(SqlTypeName typeName) { case VARCHAR: case VARBINARY: // Calcite will limit the length of the VARCHAR field to 65536 - return Integer.MAX_VALUE;//Integer.MAX_VALUE; + return Integer.MAX_VALUE; case TIMESTAMP: // by default we support timestamp with microseconds precision (Timestamp(6)) return TimestampType.DEFAULT_PRECISION; @@ -60,7 +65,7 @@ public int getMaxPrecision(SqlTypeName typeName) { case CHAR: case VARBINARY: case BINARY: - return Integer.MAX_VALUE;//Integer.MAX_VALUE; + return Integer.MAX_VALUE; case TIMESTAMP: // The maximum precision of TIMESTAMP is 3 in Calcite, @@ -83,6 +88,21 @@ public boolean shouldConvertRaggedUnionTypesToVarying() { return true; } + @Override + public RelDataType deriveAvgAggType( + RelDataTypeFactory typeFactory, RelDataType argRelDataType) { + LogicalType argType = FlinkTypeFactory.toLogicalType(argRelDataType); + LogicalType resultType = LogicalTypeMerging.findAvgAggType(argType); + return unwrapTypeFactory(typeFactory).createFieldTypeFromLogicalType(resultType); + } + + @Override + public RelDataType deriveSumType(RelDataTypeFactory typeFactory, RelDataType argRelDataType) { + LogicalType argType = FlinkTypeFactory.toLogicalType(argRelDataType); + LogicalType resultType = LogicalTypeMerging.findSumAggType(argType); + return unwrapTypeFactory(typeFactory).createFieldTypeFromLogicalType(resultType); + } + @Override public RelDataType deriveDecimalPlusType( RelDataTypeFactory typeFactory, RelDataType type1, RelDataType type2) { @@ -171,5 +191,4 @@ private boolean canDeriveDecimal(RelDataType type1, RelDataType type2) { && SqlTypeUtil.isExactNumeric(type2) && (SqlTypeUtil.isDecimal(type1) || SqlTypeUtil.isDecimal(type2)); } - } diff --git a/sqrl-planner/src/main/java/com/datasqrl/io/schema/avro/AvroToRelDataTypeConverter.java b/sqrl-planner/src/main/java/com/datasqrl/io/schema/avro/AvroToRelDataTypeConverter.java index a9275e1d8..e3cb233dd 100644 --- a/sqrl-planner/src/main/java/com/datasqrl/io/schema/avro/AvroToRelDataTypeConverter.java +++ b/sqrl-planner/src/main/java/com/datasqrl/io/schema/avro/AvroToRelDataTypeConverter.java @@ -6,10 +6,15 @@ import com.datasqrl.error.ErrorCode; import com.datasqrl.error.ErrorCollector; import com.datasqrl.util.StreamUtil; +import com.google.common.base.Preconditions; import java.util.ArrayList; +import java.util.IdentityHashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import lombok.AllArgsConstructor; +import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; import org.apache.avro.Schema.Type; @@ -21,9 +26,10 @@ public class AvroToRelDataTypeConverter { private final TypeFactory typeFactory; private final ErrorCollector errors; + private final Map typeCache; public AvroToRelDataTypeConverter(ErrorCollector errors) { - this(TypeFactory.getTypeFactory(), errors); + this(TypeFactory.getTypeFactory(), errors, new IdentityHashMap<>()); } public RelDataType convert(Schema schema) { @@ -31,82 +37,119 @@ public RelDataType convert(Schema schema) { } private RelDataType convertAvroSchemaToCalciteType(Schema schema, NamePath path) { - RelDataType relType; + // Check if the schema has already been processed + if (typeCache.containsKey(schema)) { + throw errors.exception(ErrorCode.SCHEMA_ERROR, "Recursive schema's not yet supported: %s", path); + } + switch (schema.getType()) { case UNION: - boolean containsNull = schema.getTypes().stream().anyMatch(type -> type.getType().equals(Type.NULL)); - Optional innerType; - try { - innerType = StreamUtil.getOnlyElement( - schema.getTypes().stream().filter(type -> !type.getType().equals(Type.NULL))); - } catch (IllegalArgumentException e) { - errors.fatal(ErrorCode.SCHEMA_ERROR, "Only AVRO unions with a single non-null type are supported, but found multiple types at: %",path); - return null; + boolean containsNull = schema.getTypes().stream() + .anyMatch(type -> type.getType() == Type.NULL); + + List nonNullTypes = new ArrayList<>(); + for (Schema memberSchema : schema.getTypes()) { + if (memberSchema.getType() != Type.NULL) { + nonNullTypes.add(memberSchema); + } } - if (innerType.isEmpty()) { - errors.fatal(ErrorCode.SCHEMA_ERROR, "Only AVRO unions with a single non-null type are supported, but found no types at: %",path); - return null; + + if (nonNullTypes.size() != 1) { + throw errors.exception(ErrorCode.SCHEMA_ERROR, + "Only AVRO unions with a single non-null type are supported, but found %d non-null types at: %s", + nonNullTypes.size(), path); } - relType = convertAvroSchemaToCalciteType(innerType.get(), path); - if (relType==null) return null; - if (containsNull) relType = typeFactory.createTypeWithNullability(relType, true); - return relType; + + Schema innerSchema = nonNullTypes.get(0); + RelDataType unionType = convertAvroSchemaToCalciteType(innerSchema, path); + if (containsNull) { + unionType = typeFactory.createTypeWithNullability(unionType, true); + } + typeCache.put(schema, unionType); + return unionType; case RECORD: + // Create a placeholder RelDataType and put it in the cache to handle recursion List fieldNames = new ArrayList<>(); List fieldTypes = new ArrayList<>(); + RelDataType placeholder = typeFactory.createStructType(fieldTypes, fieldNames); + typeCache.put(schema, placeholder); + for (Field field : schema.getFields()) { - relType = convertAvroSchemaToCalciteType(field.schema(), path.concat( - Name.system(field.name()))); - if (relType!=null) { - fieldNames.add(field.name()); - fieldTypes.add(relType); - } + RelDataType fieldType = convertAvroSchemaToCalciteType(field.schema(), + path.concat(Name.system(field.name()))); + fieldNames.add(field.name()); + fieldTypes.add(fieldType); } - if (fieldTypes.isEmpty()) { - errors.fatal(ErrorCode.SCHEMA_ERROR, "AVRO record does not contain any valid field at: %",path); - return null; - } - return notNull(typeFactory.createStructType(fieldTypes, fieldNames)); + RelDataType recordType = notNull(typeFactory.createStructType(fieldTypes, fieldNames)); + + typeCache.put(schema, recordType); + return recordType; case ARRAY: - relType = convertAvroSchemaToCalciteType(schema.getElementType(), path); - if (relType==null) return null; - return notNull(typeFactory.createArrayType(relType, -1)); + RelDataType elementType = convertAvroSchemaToCalciteType(schema.getElementType(), path); + RelDataType arrayType = notNull(typeFactory.createArrayType(elementType, -1)); + typeCache.put(schema, arrayType); + return arrayType; case MAP: - relType = convertAvroSchemaToCalciteType(schema.getValueType(), path); - if (relType==null) return null; - return notNull(typeFactory.createMapType(typeFactory.createSqlType(SqlTypeName.VARCHAR), relType)); - default: //primitives - relType = getPrimitive(schema, path); - if (relType!=null) relType = notNull(relType); - return relType; + RelDataType valueType = convertAvroSchemaToCalciteType(schema.getValueType(), path); + RelDataType mapType = notNull(typeFactory.createMapType( + typeFactory.createSqlType(SqlTypeName.VARCHAR), valueType)); + typeCache.put(schema, mapType); + return mapType; + default: // primitives + RelDataType primitiveType = getPrimitive(schema, path); + Preconditions.checkNotNull(primitiveType, "Avro primitive type return null."); + primitiveType = notNull(primitiveType); + typeCache.put(schema, primitiveType); + return primitiveType; } } private RelDataType getPrimitive(Schema schema, NamePath path) { + LogicalType logicalType = schema.getLogicalType(); switch (schema.getType()) { case FIXED: - if (logicalTypeEquals(schema, "decimal")) { - return notNull(typeFactory.createSqlType(SqlTypeName.DECIMAL)); + if (logicalType instanceof LogicalTypes.Decimal) { + LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) logicalType; + int precision = decimalType.getPrecision(); + int scale = decimalType.getScale(); + return typeFactory.createSqlType(SqlTypeName.DECIMAL, precision, scale); } else { - errors.fatal(ErrorCode.SCHEMA_ERROR, "Unrecognized FIXED type in AVRO schema [%s] at: %s", schema, path); - return null; + // Map FIXED type to VARBINARY with length + return typeFactory.createSqlType(SqlTypeName.VARBINARY, schema.getFixedSize()); } case ENUM: + // Map ENUM to VARCHAR with length of maximum symbol length + List symbols = schema.getEnumSymbols(); + int maxLength = symbols.stream().mapToInt(String::length).max().orElse(1); + return typeFactory.createSqlType(SqlTypeName.VARCHAR, maxLength); case STRING: return typeFactory.createSqlType(SqlTypeName.VARCHAR); case BYTES: - return typeFactory.createSqlType(SqlTypeName.VARBINARY); + if (logicalType instanceof LogicalTypes.Decimal) { + LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) logicalType; + int precision = decimalType.getPrecision(); + int scale = decimalType.getScale(); + return typeFactory.createSqlType(SqlTypeName.DECIMAL, precision, scale); + } else { + return typeFactory.createSqlType(SqlTypeName.VARBINARY); + } case INT: - if (logicalTypeEquals(schema, "date")) { + if (logicalType == LogicalTypes.date()) { return typeFactory.createSqlType(SqlTypeName.DATE); - } else if (logicalTypeEquals(schema, "time-millis")) { - return typeFactory.createSqlType(SqlTypeName.TIME); + } else if (logicalType == LogicalTypes.timeMillis()) { + return typeFactory.createSqlType(SqlTypeName.TIME, 3); // milliseconds precision } else { return typeFactory.createSqlType(SqlTypeName.INTEGER); } case LONG: - if (logicalTypeEquals(schema, "timestamp-millis")) { - return TypeFactory.makeTimestampType(typeFactory); + if (logicalType == LogicalTypes.timestampMillis()) { + return typeFactory.createSqlType(SqlTypeName.TIMESTAMP, 3); // milliseconds precision + } else if (logicalType == LogicalTypes.timestampMicros()) { + return typeFactory.createSqlType(SqlTypeName.TIMESTAMP, 6); // microseconds precision + } else if (logicalType == LogicalTypes.timeMicros()) { + //Note: Flink only supports precision 3, this is converted in the RelDataTypeSystem + // so even though this gets passed 6, the resulting precision will be 3. + return typeFactory.createSqlType(SqlTypeName.TIME, 6); // microseconds precision } else { return typeFactory.createSqlType(SqlTypeName.BIGINT); } @@ -117,23 +160,15 @@ private RelDataType getPrimitive(Schema schema, NamePath path) { case BOOLEAN: return typeFactory.createSqlType(SqlTypeName.BOOLEAN); case NULL: - errors.fatal(ErrorCode.SCHEMA_ERROR, "NULL not supported as type at: %s", path); - return null; + return typeFactory.createSqlType(SqlTypeName.NULL); default: - errors.fatal(ErrorCode.SCHEMA_ERROR, "Unrecognized AVRO Type [%s] at: %s", schema.getType(), path); - return null; + throw errors.exception(ErrorCode.SCHEMA_ERROR, "Unrecognized AVRO Type [%s] at: %s", + schema.getType(), path); } } - private static boolean logicalTypeEquals(Schema schema, String typeName) { - return schema.getLogicalType()!=null && schema.getLogicalType().getName().equalsIgnoreCase(typeName); - } - - private RelDataType notNull(RelDataType type) { return typeFactory.createTypeWithNullability(type, false); } - - } diff --git a/sqrl-planner/src/test/java/com/datasqrl/io/schema/avro/AvroToRelDataTypeConverterTest.java b/sqrl-planner/src/test/java/com/datasqrl/io/schema/avro/AvroToRelDataTypeConverterTest.java new file mode 100644 index 000000000..41354c972 --- /dev/null +++ b/sqrl-planner/src/test/java/com/datasqrl/io/schema/avro/AvroToRelDataTypeConverterTest.java @@ -0,0 +1,374 @@ +package com.datasqrl.io.schema.avro; + +import static org.junit.jupiter.api.Assertions.*; + +import com.datasqrl.calcite.type.TypeFactory; +import com.datasqrl.error.ErrorCollector; +import java.util.Arrays; +import java.util.IdentityHashMap; +import java.util.List; +import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.Schema.Type; +import org.apache.avro.SchemaBuilder; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class AvroToRelDataTypeConverterTest { + + private AvroToRelDataTypeConverter converter; + private ErrorCollector errors; + + @BeforeEach + public void setUp() { + errors = ErrorCollector.root(); + converter = new AvroToRelDataTypeConverter(errors); + } + + @Test + public void testPrimitiveTypes() { + // INT + Schema intSchema = Schema.create(Type.INT); + RelDataType intType = converter.convert(intSchema); + assertEquals(SqlTypeName.INTEGER, intType.getSqlTypeName()); + + // LONG + Schema longSchema = Schema.create(Type.LONG); + RelDataType longType = converter.convert(longSchema); + assertEquals(SqlTypeName.BIGINT, longType.getSqlTypeName()); + + // STRING + Schema stringSchema = Schema.create(Type.STRING); + RelDataType stringType = converter.convert(stringSchema); + assertEquals(SqlTypeName.VARCHAR, stringType.getSqlTypeName()); + + // BOOLEAN + Schema booleanSchema = Schema.create(Type.BOOLEAN); + RelDataType booleanType = converter.convert(booleanSchema); + assertEquals(SqlTypeName.BOOLEAN, booleanType.getSqlTypeName()); + + // FLOAT + Schema floatSchema = Schema.create(Type.FLOAT); + RelDataType floatType = converter.convert(floatSchema); + assertEquals(SqlTypeName.FLOAT, floatType.getSqlTypeName()); + + // DOUBLE + Schema doubleSchema = Schema.create(Type.DOUBLE); + RelDataType doubleType = converter.convert(doubleSchema); + assertEquals(SqlTypeName.DOUBLE, doubleType.getSqlTypeName()); + + // BYTES + Schema bytesSchema = Schema.create(Type.BYTES); + RelDataType bytesType = converter.convert(bytesSchema); + assertEquals(SqlTypeName.VARBINARY, bytesType.getSqlTypeName()); + + // NULL + Schema nullSchema = Schema.create(Type.NULL); + RelDataType nullType = converter.convert(nullSchema); + assertEquals(SqlTypeName.NULL, nullType.getSqlTypeName()); + } + + @Test + public void testLogicalTypes() { + // Decimal (Bytes) + LogicalTypes.Decimal decimalLogicalType = LogicalTypes.decimal(10, 2); + Schema decimalSchema = Schema.create(Type.BYTES); + decimalLogicalType.addToSchema(decimalSchema); + RelDataType decimalType = converter.convert(decimalSchema); + assertEquals(SqlTypeName.DECIMAL, decimalType.getSqlTypeName()); + assertEquals(10, decimalType.getPrecision()); + assertEquals(2, decimalType.getScale()); + + // Decimal (Fixed) + Schema fixedDecimalSchema = Schema.createFixed("DecimalFixed", null, null, 16); + decimalLogicalType.addToSchema(fixedDecimalSchema); + RelDataType fixedDecimalType = converter.convert(fixedDecimalSchema); + assertEquals(SqlTypeName.DECIMAL, fixedDecimalType.getSqlTypeName()); + assertEquals(10, fixedDecimalType.getPrecision()); + assertEquals(2, fixedDecimalType.getScale()); + + // Date + Schema dateSchema = Schema.create(Type.INT); + LogicalTypes.date().addToSchema(dateSchema); + RelDataType dateType = converter.convert(dateSchema); + assertEquals(SqlTypeName.DATE, dateType.getSqlTypeName()); + + // Time (millis) + Schema timeMillisSchema = Schema.create(Type.INT); + LogicalTypes.timeMillis().addToSchema(timeMillisSchema); + RelDataType timeMillisType = converter.convert(timeMillisSchema); + assertEquals(SqlTypeName.TIME, timeMillisType.getSqlTypeName()); + assertEquals(3, timeMillisType.getPrecision()); + + // Time (micros) + Schema timeMicrosSchema = Schema.create(Type.LONG); + LogicalTypes.timeMicros().addToSchema(timeMicrosSchema); + RelDataType timeMicrosType = converter.convert(timeMicrosSchema); + assertEquals(SqlTypeName.TIME, timeMicrosType.getSqlTypeName()); + // Note: Flink only supports precision 3, this is converted in the RelDataTypeSystem + // so even though this gets passed 6, the resulting precision will be 3. + assertEquals(3, timeMicrosType.getPrecision()); + + // Timestamp (millis) + Schema timestampMillisSchema = Schema.create(Type.LONG); + LogicalTypes.timestampMillis().addToSchema(timestampMillisSchema); + RelDataType timestampMillisType = converter.convert(timestampMillisSchema); + assertEquals(SqlTypeName.TIMESTAMP, timestampMillisType.getSqlTypeName()); + assertEquals(3, timestampMillisType.getPrecision()); + + // Timestamp (micros) + Schema timestampMicrosSchema = Schema.create(Type.LONG); + LogicalTypes.timestampMicros().addToSchema(timestampMicrosSchema); + RelDataType timestampMicrosType = converter.convert(timestampMicrosSchema); + assertEquals(SqlTypeName.TIMESTAMP, timestampMicrosType.getSqlTypeName()); + assertEquals(6, timestampMicrosType.getPrecision()); + + // UUID + Schema uuidSchema = Schema.create(Type.STRING); + LogicalTypes.uuid().addToSchema(uuidSchema); + RelDataType uuidType = converter.convert(uuidSchema); + assertEquals(SqlTypeName.VARCHAR, uuidType.getSqlTypeName()); + assertEquals(Integer.MAX_VALUE, uuidType.getPrecision()); + } + + @Test + public void testUnionTypes() { + // Union of NULL and INT (nullable INT) + Schema nullableIntSchema = Schema.createUnion( + Arrays.asList(Schema.create(Type.NULL), Schema.create(Type.INT))); + RelDataType nullableIntType = converter.convert(nullableIntSchema); + assertEquals(SqlTypeName.INTEGER, nullableIntType.getSqlTypeName()); + assertTrue(nullableIntType.isNullable()); + + // Union with multiple non-null types (should report an error) + Schema invalidUnionSchema = Schema.createUnion( + Arrays.asList(Schema.create(Type.INT), Schema.create(Type.STRING))); + try { + RelDataType invalidUnionType = converter.convert(invalidUnionSchema); + fail("Expected failure"); + } catch (Exception e) {} + assertTrue(errors.hasErrors()); + } + + @Test + public void testRecordType() { + // Create a record schema with various fields + Schema recordSchema = SchemaBuilder.record("TestRecord").fields() + .requiredString("name") + .optionalInt("age") + .name("balance").type().optional().type(LogicalTypes.decimal(10, 2).addToSchema(Schema.create(Type.BYTES))) + .endRecord(); + + RelDataType recordType = converter.convert(recordSchema); + assertNotNull(recordType); + assertEquals(3, recordType.getFieldCount()); + + RelDataTypeField nameField = recordType.getFieldList().get(0); + assertEquals("name", nameField.getName()); + assertEquals(SqlTypeName.VARCHAR, nameField.getType().getSqlTypeName()); + assertFalse(nameField.getType().isNullable()); + + RelDataTypeField ageField = recordType.getFieldList().get(1); + assertEquals("age", ageField.getName()); + assertEquals(SqlTypeName.INTEGER, ageField.getType().getSqlTypeName()); + assertTrue(ageField.getType().isNullable()); + + RelDataTypeField balanceField = recordType.getFieldList().get(2); + assertEquals("balance", balanceField.getName()); + assertEquals(SqlTypeName.DECIMAL, balanceField.getType().getSqlTypeName()); + assertEquals(10, balanceField.getType().getPrecision()); + assertEquals(2, balanceField.getType().getScale()); + assertTrue(balanceField.getType().isNullable()); + } + + @Test + public void testArrayType() { + // Array of INT + Schema arraySchema = Schema.createArray(Schema.create(Type.INT)); + RelDataType arrayType = converter.convert(arraySchema); + assertEquals(SqlTypeName.ARRAY, arrayType.getSqlTypeName()); + RelDataType elementType = arrayType.getComponentType(); + assertEquals(SqlTypeName.INTEGER, elementType.getSqlTypeName()); + } + + @Test + public void testMapType() { + // Map of STRING to LONG + Schema mapSchema = Schema.createMap(Schema.create(Type.LONG)); + RelDataType mapType = converter.convert(mapSchema); + assertEquals(SqlTypeName.MAP, mapType.getSqlTypeName()); + RelDataType keyType = mapType.getKeyType(); + RelDataType valueType = mapType.getValueType(); + assertEquals(SqlTypeName.VARCHAR, keyType.getSqlTypeName()); + assertEquals(SqlTypeName.BIGINT, valueType.getSqlTypeName()); + } + + @Test + public void testEnumType() { + // Enum with symbols + List symbols = Arrays.asList("RED", "GREEN", "BLUE"); + Schema enumSchema = Schema.createEnum("Color", null, null, symbols); + RelDataType enumType = converter.convert(enumSchema); + assertEquals(SqlTypeName.VARCHAR, enumType.getSqlTypeName()); + int expectedLength = symbols.stream().mapToInt(String::length).max().orElse(1); + assertEquals(expectedLength, enumType.getPrecision()); + } + + @Test + public void testFixedType() { + // Fixed type without logical type (should map to VARBINARY) + Schema fixedSchema = Schema.createFixed("FixedType", null, null, 16); + RelDataType fixedType = converter.convert(fixedSchema); + assertEquals(SqlTypeName.VARBINARY, fixedType.getSqlTypeName()); + assertEquals(16, fixedType.getPrecision()); + } + + @Test + public void testComplexRecord() { + // Record with nested record, array, and map + Schema innerRecordSchema = SchemaBuilder.record("InnerRecord").fields() + .requiredInt("innerField") + .endRecord(); + + Schema complexRecordSchema = SchemaBuilder.record("ComplexRecord").fields() + .name("nestedRecord").type(innerRecordSchema).noDefault() + .name("intArray").type().array().items().intType().noDefault() + .name("stringMap").type().map().values().stringType().noDefault() + .endRecord(); + + RelDataType complexRecordType = converter.convert(complexRecordSchema); + assertNotNull(complexRecordType); + assertEquals(3, complexRecordType.getFieldCount()); + + // Validate nested record field + RelDataTypeField nestedRecordField = complexRecordType.getFieldList().get(0); + assertEquals("nestedRecord", nestedRecordField.getName()); + RelDataType nestedRecordType = nestedRecordField.getType(); + assertEquals(SqlTypeName.ROW, nestedRecordType.getSqlTypeName()); + assertEquals(1, nestedRecordType.getFieldCount()); + assertEquals(SqlTypeName.INTEGER, nestedRecordType.getFieldList().get(0).getType().getSqlTypeName()); + + // Validate array field + RelDataTypeField intArrayField = complexRecordType.getFieldList().get(1); + assertEquals("intArray", intArrayField.getName()); + RelDataType intArrayType = intArrayField.getType(); + assertEquals(SqlTypeName.ARRAY, intArrayType.getSqlTypeName()); + assertEquals(SqlTypeName.INTEGER, intArrayType.getComponentType().getSqlTypeName()); + + // Validate map field + RelDataTypeField stringMapField = complexRecordType.getFieldList().get(2); + assertEquals("stringMap", stringMapField.getName()); + RelDataType stringMapType = stringMapField.getType(); + assertEquals(SqlTypeName.MAP, stringMapType.getSqlTypeName()); + assertEquals(SqlTypeName.VARCHAR, stringMapType.getKeyType().getSqlTypeName()); + assertEquals(SqlTypeName.VARCHAR, stringMapType.getValueType().getSqlTypeName()); + } + + @Test + public void testInvalidSchema() { + // Invalid schema: Union with multiple non-null types + Schema invalidUnionSchema = Schema.createUnion( + Arrays.asList(Schema.create(Type.INT), Schema.create(Type.STRING))); + RelDataType type = null; + try { + type = converter.convert(invalidUnionSchema); + fail("Expected failure"); + } catch (Exception e) {} + assertNull(type); + assertTrue(errors.hasErrors()); + } + + @Test + public void testNullableFieldsInRecord() { + // Record with nullable fields + Schema recordSchema = SchemaBuilder.record("NullableRecord").fields() + .name("nullableInt").type().unionOf().nullType().and().intType().endUnion().noDefault() + .name("nonNullableString").type().stringType().noDefault() + .endRecord(); + + RelDataType recordType = converter.convert(recordSchema); + assertNotNull(recordType); + assertEquals(2, recordType.getFieldCount()); + + RelDataTypeField nullableIntField = recordType.getFieldList().get(0); + assertTrue(nullableIntField.getType().isNullable()); + + RelDataTypeField nonNullableStringField = recordType.getFieldList().get(1); + assertFalse(nonNullableStringField.getType().isNullable()); + } + + @Test + public void testNestedLogicalTypes() { + // Array of timestamps + Schema timestampSchema = Schema.create(Type.LONG); + LogicalTypes.timestampMicros().addToSchema(timestampSchema); + Schema arraySchema = Schema.createArray(timestampSchema); + + RelDataType arrayType = converter.convert(arraySchema); + assertEquals(SqlTypeName.ARRAY, arrayType.getSqlTypeName()); + RelDataType elementType = arrayType.getComponentType(); + assertEquals(SqlTypeName.TIMESTAMP, elementType.getSqlTypeName()); + assertEquals(6, elementType.getPrecision()); + } + + @Test + public void testEmptyRecord() { + // Empty record + Schema emptyRecordSchema = SchemaBuilder.record("EmptyRecord").fields().endRecord(); + RelDataType emptyRecordType = converter.convert(emptyRecordSchema); + assertEquals(SqlTypeName.ROW, emptyRecordType.getSqlTypeName()); + } + + @Test + public void testUnsupportedLogicalType() { + // Add an unsupported logical type to a schema + Schema unsupportedLogicalSchema = Schema.create(Type.INT); + LogicalTypes.LogicalTypeFactory customFactory = schema -> new LogicalType("custom-logical-type"); + LogicalTypes.register("custom-logical-type", customFactory); + new LogicalType("custom-logical-type").addToSchema(unsupportedLogicalSchema); + + RelDataType type = converter.convert(unsupportedLogicalSchema); + assertEquals(SqlTypeName.INTEGER, type.getSqlTypeName()); // Should default to base type + } + + @Test + public void testNullSchema() { + // Test conversion with null schema (should throw NullPointerException) + assertThrows(NullPointerException.class, () -> converter.convert(null)); + } + + @Test + public void testRecursiveSchema() { + // Recursive record schema (simplified for test) + Schema recursiveSchema = SchemaBuilder.record("Node").fields() + .name("value").type().intType().noDefault() + .name("next").type().optional().type("Node") + .endRecord(); + + // Since the converter doesn't handle recursive schemas, it should report an error + try { + RelDataType type = converter.convert(recursiveSchema); + fail("Should fail"); + } catch (Exception e) {} + assertTrue(errors.hasErrors()); + } + + @Test + public void testSchemaWithAliases() { + // Schema with aliases (should be ignored in conversion) + Schema aliasedSchema = SchemaBuilder.record("OriginalName") + .aliases("Alias1", "Alias2") + .fields() + .requiredString("field") + .endRecord(); + + RelDataType type = converter.convert(aliasedSchema); + assertNotNull(type); + assertEquals("OriginalName", aliasedSchema.getName()); + assertEquals(1, type.getFieldCount()); + } +} From 1cb628586f9a1c7ecf9667069e8fdf9afb56e041 Mon Sep 17 00:00:00 2001 From: Daniel Henneberger Date: Fri, 13 Sep 2024 11:15:06 -0700 Subject: [PATCH 2/5] Update snaps for the new datatype Signed-off-by: Daniel Henneberger --- .../UseCaseCompileTest/metrics--package.txt | 6 ++-- .../patient-sensor-patient-sensor-package.txt | 30 +++++++++---------- ...sors-mutation-sensors-mutation-package.txt | 4 +-- 3 files changed, 20 insertions(+), 20 deletions(-) diff --git a/sqrl-testing/sqrl-integration-tests/src/test/resources/snapshots/com/datasqrl/UseCaseCompileTest/metrics--package.txt b/sqrl-testing/sqrl-integration-tests/src/test/resources/snapshots/com/datasqrl/UseCaseCompileTest/metrics--package.txt index b1885aad0..7d239467b 100644 --- a/sqrl-testing/sqrl-integration-tests/src/test/resources/snapshots/com/datasqrl/UseCaseCompileTest/metrics--package.txt +++ b/sqrl-testing/sqrl-integration-tests/src/test/resources/snapshots/com/datasqrl/UseCaseCompileTest/metrics--package.txt @@ -9,7 +9,7 @@ Timestamp : __timestamp Schema: - sensorid: BIGINT NOT NULL - timeSec: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL - - temp: DECIMAL(10, 5) NOT NULL + - temp: DECIMAL(38, 6) NOT NULL - __timestamp: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL Plan: LogicalAggregate(group=[{0, 1}], temp=[AVG($2)], __timestamp=[MAX($4)]) @@ -25,7 +25,7 @@ Primary Key: sensorid Timestamp : __timestamp Schema: - sensorid: BIGINT NOT NULL - - maxTemp: DECIMAL(10, 5) NOT NULL + - maxTemp: DECIMAL(38, 6) NOT NULL - __timestamp: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL Plan: LogicalAggregate(group=[{0}], maxTemp=[MAX($2)], __timestamp=[MAX($3)]) @@ -93,7 +93,7 @@ type CreatedReading { "flinkSql" : [ "CREATE TEMPORARY FUNCTION IF NOT EXISTS `endofsecond` AS 'com.datasqrl.time.EndOfSecond' LANGUAGE JAVA;", "CREATE TEMPORARY TABLE `sensorreading_1` (\n `sensorid` BIGINT NOT NULL,\n `temperature` DECIMAL(10, 5) NOT NULL,\n `humidity` DECIMAL(10, 5) NOT NULL,\n `_uuid` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n `event_time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,\n PRIMARY KEY (`_uuid`) NOT ENFORCED,\n WATERMARK FOR `event_time` AS `event_time` - INTERVAL '0.0' SECOND\n) WITH (\n 'hostname' = '${PGHOST}',\n 'password' = '${JDBC_PASSWORD}',\n 'connector' = 'postgres-cdc',\n 'port' = '5432',\n 'slot.name' = 'flink_slot',\n 'database-name' = 'datasqrl',\n 'schema-name' = 'public',\n 'decoding.plugin.name' = 'pgoutput',\n 'table-name' = 'schemaAddreading',\n 'debezium.slot.drop_on_stop' = 'false',\n 'username' = '${JDBC_USERNAME}'\n);", - "CREATE TEMPORARY TABLE `secreading_1` (\n `sensorid` BIGINT NOT NULL,\n `timeSec` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,\n `temp` DECIMAL(10, 5) NOT NULL,\n `__timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,\n PRIMARY KEY (`sensorid`, `timeSec`) NOT ENFORCED\n) WITH (\n 'password' = '${JDBC_PASSWORD}',\n 'connector' = 'jdbc-sqrl',\n 'driver' = 'org.postgresql.Driver',\n 'table-name' = 'secreading_1',\n 'url' = '${JDBC_URL}',\n 'username' = '${JDBC_USERNAME}'\n);", + "CREATE TEMPORARY TABLE `secreading_1` (\n `sensorid` BIGINT NOT NULL,\n `timeSec` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,\n `temp` DECIMAL(38, 6) NOT NULL,\n `__timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,\n PRIMARY KEY (`sensorid`, `timeSec`) NOT ENFORCED\n) WITH (\n 'password' = '${JDBC_PASSWORD}',\n 'connector' = 'jdbc-sqrl',\n 'driver' = 'org.postgresql.Driver',\n 'table-name' = 'secreading_1',\n 'url' = '${JDBC_URL}',\n 'username' = '${JDBC_USERNAME}'\n);", "CREATE TEMPORARY TABLE `sensorreading_2_1` (\n `sensorid` BIGINT NOT NULL,\n `temperature` DECIMAL(10, 5) NOT NULL,\n `humidity` DECIMAL(10, 5) NOT NULL,\n `_uuid` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n `event_time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL\n) WITH (\n 'connector' = 'print',\n 'print-identifier' = 'LogSensorReading'\n);", "CREATE VIEW `table$1`\nAS\nSELECT `sensorid`, ENDOFSECOND(`event_time`) AS `timeSec`, AVG(`temperature`) AS `temp`, MAX(`event_time`) AS `__timestamp`\nFROM `sensorreading_1`\nGROUP BY `sensorid`, ENDOFSECOND(`event_time`);", "CREATE VIEW `table$2`\nAS\nSELECT *\nFROM `sensorreading_1`;", diff --git a/sqrl-testing/sqrl-integration-tests/src/test/resources/snapshots/com/datasqrl/UseCaseCompileTest/patient-sensor-patient-sensor-package.txt b/sqrl-testing/sqrl-integration-tests/src/test/resources/snapshots/com/datasqrl/UseCaseCompileTest/patient-sensor-patient-sensor-package.txt index a0a33c913..6f3fab19c 100644 --- a/sqrl-testing/sqrl-integration-tests/src/test/resources/snapshots/com/datasqrl/UseCaseCompileTest/patient-sensor-patient-sensor-package.txt +++ b/sqrl-testing/sqrl-integration-tests/src/test/resources/snapshots/com/datasqrl/UseCaseCompileTest/patient-sensor-patient-sensor-package.txt @@ -9,8 +9,8 @@ Timestamp : __pk3_created Schema: - __pk1_groupId: BIGINT NOT NULL - __pk3_created: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL - - maxTemp: DECIMAL(10, 5) NOT NULL - - minTemp: DECIMAL(10, 5) NOT NULL + - maxTemp: DECIMAL(38, 6) NOT NULL + - minTemp: DECIMAL(38, 6) NOT NULL Plan: LogicalProject(__pk1_groupId=[$0], __pk3_created=[$1], maxTemp=[$2], minTemp=[$3]) LogicalAggregate(group=[{0, 2}], maxTemp=[MAX($7)], minTemp=[MIN($8)]) hints[TumbleAggregationHint options:[2, INSTANT, 2, 1, 0]] @@ -46,9 +46,9 @@ Timestamp : timeHour Schema: - patientid: BIGINT NOT NULL - timeHour: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL - - avgTemp: DECIMAL(10, 5) NOT NULL - - maxTemp: DECIMAL(10, 5) NOT NULL - - minTemp: DECIMAL(10, 5) NOT NULL + - avgTemp: DECIMAL(38, 6) NOT NULL + - maxTemp: DECIMAL(38, 6) NOT NULL + - minTemp: DECIMAL(38, 6) NOT NULL Plan: LogicalAggregate(group=[{0, 1}], avgTemp=[AVG($2)], maxTemp=[MAX($2)], minTemp=[MIN($2)]) hints[TumbleAggregationHint options:[1, FUNCTION, 4, 3600000, 0]] LogicalProject(patientid=[$5], timeHour=[endOfHour($1)], smoothTemp=[$2], sensorid=[$0], timeMin=[$1]) @@ -68,9 +68,9 @@ Primary Key: patientid Timestamp : _timeMin Schema: - patientid: BIGINT NOT NULL - - avgTemp: DECIMAL(10, 5) NOT NULL - - maxTemp: DECIMAL(10, 5) NOT NULL - - minTemp: DECIMAL(10, 5) NOT NULL + - avgTemp: DECIMAL(38, 6) NOT NULL + - maxTemp: DECIMAL(38, 6) NOT NULL + - minTemp: DECIMAL(38, 6) NOT NULL - _timeMin: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL Post Processors: - topN: partition=patientid limit=1 sort=#4: _timeMin TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) DESC-nulls-last @@ -139,7 +139,7 @@ Timestamp : timeMin Schema: - sensorid: BIGINT NOT NULL - timeMin: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL - - smoothTemp: DECIMAL(10, 5) NOT NULL + - smoothTemp: DECIMAL(38, 6) NOT NULL Plan: LogicalAggregate(group=[{0, 1}], smoothTemp=[AVG($2)]) hints[TumbleAggregationHint options:[1, FUNCTION, 4, 60000, 0]] LogicalProject(sensorid=[$0], timeMin=[endOfMinute($3)], temperature=[$1], _uuid=[$2], event_time=[$3]) @@ -155,7 +155,7 @@ Timestamp : timeMin Schema: - patientid: BIGINT NOT NULL - sensorid: BIGINT NOT NULL - - smoothTemp: DECIMAL(10, 5) NOT NULL + - smoothTemp: DECIMAL(38, 6) NOT NULL - timeMin: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL Plan: LogicalProject(patientid=[$5], sensorid=[$0], smoothTemp=[$2], timeMin=[$1]) @@ -200,12 +200,12 @@ Inputs: tempalert_1 "CREATE TEMPORARY TABLE `sensorreading_1` (\n `sensorid` BIGINT NOT NULL,\n `temperature` DECIMAL(10, 5) NOT NULL,\n `_uuid` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n `event_time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL METADATA FROM 'timestamp',\n WATERMARK FOR `event_time` AS `event_time` - INTERVAL '0.0' SECOND\n) WITH (\n 'properties.bootstrap.servers' = '${PROPERTIES_BOOTSTRAP_SERVERS}',\n 'properties.auto.offset.reset' = 'earliest',\n 'connector' = 'kafka',\n 'format' = 'flexible-json',\n 'properties.group.id' = '${PROPERTIES_GROUP_ID}',\n 'topic' = 'patient-sensor-addreading',\n 'scan.startup.mode' = 'group-offsets'\n);", "CREATE TEMPORARY TABLE `observationgroup_1` (\n `groupId` BIGINT NOT NULL,\n `groupName` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n `created` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,\n `patients` ROW(`patientId` BIGINT NOT NULL) NOT NULL ARRAY NOT NULL,\n PRIMARY KEY (`groupId`, `created`) NOT ENFORCED,\n WATERMARK FOR `created` AS `created` - INTERVAL '0.001' SECOND\n) WITH (\n 'format' = 'json',\n 'path' = '/mnt/build/data',\n 'connector' = 'filesystem',\n 'source.path.regex-pattern' = 'to be determined'\n);", "CREATE TEMPORARY TABLE `sensors_1` (\n `id` BIGINT NOT NULL,\n `patientid` BIGINT NOT NULL,\n `placed` BIGINT NOT NULL,\n `placedTimestamp` AS EPOCHMILLITOTIMESTAMP(`placed`),\n PRIMARY KEY (`id`, `placed`) NOT ENFORCED,\n WATERMARK FOR `placedTimestamp` AS `placedTimestamp` - INTERVAL '0.001' SECOND\n) WITH (\n 'format' = 'json',\n 'path' = '/mnt/build/data',\n 'connector' = 'filesystem'\n);", - "CREATE TEMPORARY TABLE `lasthour_1` (\n `__pk1_groupId` BIGINT NOT NULL,\n `__pk3_created` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,\n `maxTemp` DECIMAL(10, 5) NOT NULL,\n `minTemp` DECIMAL(10, 5) NOT NULL,\n PRIMARY KEY (`__pk1_groupId`, `__pk3_created`) NOT ENFORCED\n) WITH (\n 'password' = '${JDBC_PASSWORD}',\n 'connector' = 'jdbc-sqrl',\n 'driver' = 'org.postgresql.Driver',\n 'table-name' = 'lasthour_1',\n 'url' = '${JDBC_URL}',\n 'username' = '${JDBC_USERNAME}'\n);", + "CREATE TEMPORARY TABLE `lasthour_1` (\n `__pk1_groupId` BIGINT NOT NULL,\n `__pk3_created` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,\n `maxTemp` DECIMAL(38, 6) NOT NULL,\n `minTemp` DECIMAL(38, 6) NOT NULL,\n PRIMARY KEY (`__pk1_groupId`, `__pk3_created`) NOT ENFORCED\n) WITH (\n 'password' = '${JDBC_PASSWORD}',\n 'connector' = 'jdbc-sqrl',\n 'driver' = 'org.postgresql.Driver',\n 'table-name' = 'lasthour_1',\n 'url' = '${JDBC_URL}',\n 'username' = '${JDBC_USERNAME}'\n);", "CREATE TEMPORARY TABLE `observationgroup_2` (\n `groupId` BIGINT NOT NULL,\n `groupName` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n `created` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,\n `patients` RAW('com.datasqrl.json.FlinkJsonType', 'ADFjb20uZGF0YXNxcmwuanNvbi5GbGlua0pzb25UeXBlU2VyaWFsaXplclNuYXBzaG90AAAAAQApY29tLmRhdGFzcXJsLmpzb24uRmxpbmtKc29uVHlwZVNlcmlhbGl6ZXI='),\n PRIMARY KEY (`groupId`, `created`) NOT ENFORCED\n) WITH (\n 'password' = '${JDBC_PASSWORD}',\n 'connector' = 'jdbc-sqrl',\n 'driver' = 'org.postgresql.Driver',\n 'table-name' = 'observationgroup_2',\n 'url' = '${JDBC_URL}',\n 'username' = '${JDBC_USERNAME}'\n);", - "CREATE TEMPORARY TABLE `patienthourly_1` (\n `patientid` BIGINT NOT NULL,\n `timeHour` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,\n `avgTemp` DECIMAL(10, 5) NOT NULL,\n `maxTemp` DECIMAL(10, 5) NOT NULL,\n `minTemp` DECIMAL(10, 5) NOT NULL,\n PRIMARY KEY (`patientid`, `timeHour`) NOT ENFORCED\n) WITH (\n 'password' = '${JDBC_PASSWORD}',\n 'connector' = 'jdbc-sqrl',\n 'driver' = 'org.postgresql.Driver',\n 'table-name' = 'patienthourly_1',\n 'url' = '${JDBC_URL}',\n 'username' = '${JDBC_USERNAME}'\n);", - "CREATE TEMPORARY TABLE `patientlasthour_1` (\n `patientid` BIGINT NOT NULL,\n `avgTemp` DECIMAL(10, 5) NOT NULL,\n `maxTemp` DECIMAL(10, 5) NOT NULL,\n `minTemp` DECIMAL(10, 5) NOT NULL,\n `_timeMin` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,\n PRIMARY KEY (`patientid`) NOT ENFORCED\n) WITH (\n 'password' = '${JDBC_PASSWORD}',\n 'connector' = 'jdbc-sqrl',\n 'driver' = 'org.postgresql.Driver',\n 'table-name' = 'patientlasthour_1',\n 'url' = '${JDBC_URL}',\n 'username' = '${JDBC_USERNAME}'\n);", - "CREATE TEMPORARY TABLE `tempalert_1_1` (\n `patientid` BIGINT NOT NULL,\n `sensorid` BIGINT NOT NULL,\n `smoothTemp` DECIMAL(10, 5) NOT NULL,\n `timeMin` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL\n) WITH (\n 'connector' = 'print',\n 'print-identifier' = 'tempWarning'\n);", - "CREATE TEMPORARY TABLE `tempalert_1_2` (\n `patientid` BIGINT NOT NULL,\n `sensorid` BIGINT NOT NULL,\n `smoothTemp` DECIMAL(10, 5) NOT NULL,\n `timeMin` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL\n) WITH (\n 'properties.bootstrap.servers' = '${PROPERTIES_BOOTSTRAP_SERVERS}',\n 'properties.auto.offset.reset' = 'earliest',\n 'connector' = 'kafka',\n 'format' = 'flexible-json',\n 'properties.group.id' = '${PROPERTIES_GROUP_ID}',\n 'topic' = 'tempalert_1',\n 'scan.startup.mode' = 'group-offsets'\n);", + "CREATE TEMPORARY TABLE `patienthourly_1` (\n `patientid` BIGINT NOT NULL,\n `timeHour` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,\n `avgTemp` DECIMAL(38, 6) NOT NULL,\n `maxTemp` DECIMAL(38, 6) NOT NULL,\n `minTemp` DECIMAL(38, 6) NOT NULL,\n PRIMARY KEY (`patientid`, `timeHour`) NOT ENFORCED\n) WITH (\n 'password' = '${JDBC_PASSWORD}',\n 'connector' = 'jdbc-sqrl',\n 'driver' = 'org.postgresql.Driver',\n 'table-name' = 'patienthourly_1',\n 'url' = '${JDBC_URL}',\n 'username' = '${JDBC_USERNAME}'\n);", + "CREATE TEMPORARY TABLE `patientlasthour_1` (\n `patientid` BIGINT NOT NULL,\n `avgTemp` DECIMAL(38, 6) NOT NULL,\n `maxTemp` DECIMAL(38, 6) NOT NULL,\n `minTemp` DECIMAL(38, 6) NOT NULL,\n `_timeMin` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,\n PRIMARY KEY (`patientid`) NOT ENFORCED\n) WITH (\n 'password' = '${JDBC_PASSWORD}',\n 'connector' = 'jdbc-sqrl',\n 'driver' = 'org.postgresql.Driver',\n 'table-name' = 'patientlasthour_1',\n 'url' = '${JDBC_URL}',\n 'username' = '${JDBC_USERNAME}'\n);", + "CREATE TEMPORARY TABLE `tempalert_1_1` (\n `patientid` BIGINT NOT NULL,\n `sensorid` BIGINT NOT NULL,\n `smoothTemp` DECIMAL(38, 6) NOT NULL,\n `timeMin` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL\n) WITH (\n 'connector' = 'print',\n 'print-identifier' = 'tempWarning'\n);", + "CREATE TEMPORARY TABLE `tempalert_1_2` (\n `patientid` BIGINT NOT NULL,\n `sensorid` BIGINT NOT NULL,\n `smoothTemp` DECIMAL(38, 6) NOT NULL,\n `timeMin` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL\n) WITH (\n 'properties.bootstrap.servers' = '${PROPERTIES_BOOTSTRAP_SERVERS}',\n 'properties.auto.offset.reset' = 'earliest',\n 'connector' = 'kafka',\n 'format' = 'flexible-json',\n 'properties.group.id' = '${PROPERTIES_GROUP_ID}',\n 'topic' = 'tempalert_1',\n 'scan.startup.mode' = 'group-offsets'\n);", "CREATE VIEW `table$1`\nAS\nSELECT *\nFROM (SELECT `id`, `patientid`, `placed`, `placedTimestamp`, ROW_NUMBER() OVER (PARTITION BY `id` ORDER BY `placedTimestamp` DESC) AS `_rownum`\n FROM `sensors_1`) AS `t`\nWHERE `_rownum` = 1;", "CREATE VIEW `table$2`\nAS\nSELECT `sensorid`, ENDOFMINUTE(`event_time`) AS `timeMin`, `temperature`, `_uuid`, `event_time`\nFROM `sensorreading_1`;", "CREATE VIEW `table$3`\nAS\nSELECT *\nFROM (SELECT `sensorid`, `window_time` AS `timeMin`, AVG(`temperature`) AS `smoothTemp`, `sensorid` AS `sensorid3`\n FROM TABLE(TUMBLE(TABLE `table$2`, DESCRIPTOR(`event_time`), INTERVAL '60' SECOND(5), INTERVAL '0' SECOND(1))) AS `t2`\n GROUP BY `sensorid`, `window_start`, `window_end`, `window_time`) AS `$cor5`\n INNER JOIN `table$1` FOR SYSTEM_TIME AS OF `$cor5`.`timeMin` AS `t0` ON `t0`.`id` = `$cor5`.`sensorid3`;", diff --git a/sqrl-testing/sqrl-integration-tests/src/test/resources/snapshots/com/datasqrl/UseCaseCompileTest/sensors-mutation-sensors-mutation-package.txt b/sqrl-testing/sqrl-integration-tests/src/test/resources/snapshots/com/datasqrl/UseCaseCompileTest/sensors-mutation-sensors-mutation-package.txt index 78aa4c1ec..ca76297a5 100644 --- a/sqrl-testing/sqrl-integration-tests/src/test/resources/snapshots/com/datasqrl/UseCaseCompileTest/sensors-mutation-sensors-mutation-package.txt +++ b/sqrl-testing/sqrl-integration-tests/src/test/resources/snapshots/com/datasqrl/UseCaseCompileTest/sensors-mutation-sensors-mutation-package.txt @@ -31,7 +31,7 @@ Timestamp : timeSec Schema: - sensorid: BIGINT NOT NULL - timeSec: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL - - temperature: DECIMAL(10, 5) NOT NULL + - temperature: DECIMAL(38, 6) NOT NULL Plan: LogicalAggregate(group=[{0, 1}], temperature=[AVG($2)]) hints[TumbleAggregationHint options:[1, FUNCTION, 4, 1000, 0]] LogicalProject(sensorid=[$0], timeSec=[endOfSecond($3)], temperature=[$1], _uuid=[$2], event_time=[$3]) @@ -71,7 +71,7 @@ LogicalTableScan(table=[[sensorreading_1]]) "flinkSql" : [ "CREATE TEMPORARY FUNCTION IF NOT EXISTS `endofsecond` AS 'com.datasqrl.time.EndOfSecond' LANGUAGE JAVA;", "CREATE TEMPORARY TABLE `sensorreading_1` (\n `sensorid` BIGINT NOT NULL,\n `temperature` DECIMAL(10, 5) NOT NULL,\n `_uuid` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n `event_time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL METADATA FROM 'timestamp',\n WATERMARK FOR `event_time` AS `event_time` - INTERVAL '0.0' SECOND\n) WITH (\n 'properties.bootstrap.servers' = '${PROPERTIES_BOOTSTRAP_SERVERS}',\n 'properties.auto.offset.reset' = 'earliest',\n 'connector' = 'kafka',\n 'format' = 'flexible-json',\n 'properties.group.id' = '${PROPERTIES_GROUP_ID}',\n 'topic' = 'sensors-mutation-addreading',\n 'scan.startup.mode' = 'group-offsets'\n);", - "CREATE TEMPORARY TABLE `secreading_1` (\n `sensorid` BIGINT NOT NULL,\n `timeSec` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,\n `temperature` DECIMAL(10, 5) NOT NULL,\n PRIMARY KEY (`sensorid`, `timeSec`) NOT ENFORCED\n) WITH (\n 'password' = '${JDBC_PASSWORD}',\n 'connector' = 'jdbc-sqrl',\n 'driver' = 'org.postgresql.Driver',\n 'table-name' = 'secreading_1',\n 'url' = '${JDBC_URL}',\n 'username' = '${JDBC_USERNAME}'\n);", + "CREATE TEMPORARY TABLE `secreading_1` (\n `sensorid` BIGINT NOT NULL,\n `timeSec` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,\n `temperature` DECIMAL(38, 6) NOT NULL,\n PRIMARY KEY (`sensorid`, `timeSec`) NOT ENFORCED\n) WITH (\n 'password' = '${JDBC_PASSWORD}',\n 'connector' = 'jdbc-sqrl',\n 'driver' = 'org.postgresql.Driver',\n 'table-name' = 'secreading_1',\n 'url' = '${JDBC_URL}',\n 'username' = '${JDBC_USERNAME}'\n);", "CREATE TEMPORARY TABLE `sensormaxtemp_1` (\n `sensorid` BIGINT NOT NULL,\n `maxTemp` DECIMAL(10, 5) NOT NULL,\n `__timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,\n PRIMARY KEY (`sensorid`) NOT ENFORCED\n) WITH (\n 'password' = '${JDBC_PASSWORD}',\n 'connector' = 'jdbc-sqrl',\n 'driver' = 'org.postgresql.Driver',\n 'table-name' = 'sensormaxtemp_1',\n 'url' = '${JDBC_URL}',\n 'username' = '${JDBC_USERNAME}'\n);", "CREATE TEMPORARY TABLE `hightempalert_1_1` (\n `sensorid` BIGINT NOT NULL,\n `temperature` DECIMAL(10, 5) NOT NULL,\n `_uuid` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n `event_time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL\n) WITH (\n 'properties.bootstrap.servers' = '${PROPERTIES_BOOTSTRAP_SERVERS}',\n 'properties.auto.offset.reset' = 'earliest',\n 'connector' = 'kafka',\n 'format' = 'flexible-json',\n 'properties.group.id' = '${PROPERTIES_GROUP_ID}',\n 'topic' = 'hightempalert_1',\n 'scan.startup.mode' = 'group-offsets'\n);", "CREATE VIEW `table$1`\nAS\nSELECT `sensorid`, ENDOFSECOND(`event_time`) AS `timeSec`, `temperature`, `_uuid`, `event_time`\nFROM `sensorreading_1`;", From 883aa1c9fa9311a4f95c263020b197d6ec17e45a Mon Sep 17 00:00:00 2001 From: Daniel Henneberger Date: Fri, 13 Sep 2024 13:53:24 -0700 Subject: [PATCH 3/5] Use flink to convert avro format Signed-off-by: Daniel Henneberger --- sqrl-planner/pom.xml | 4 + .../avro/AvroSchemaToRelDataTypeFactory.java | 21 ++- .../avro/AvroToRelDataTypeConverter.java | 128 +++++------------- .../FlexibleSchemaToRelDataTypeFactory.java | 3 +- .../SchemaToRelDataTypeFactory.java | 3 +- .../datasqrl/plan/table/TableConverter.java | 2 +- .../avro/AvroToRelDataTypeConverterTest.java | 10 +- .../schema/AvroSchemaHandlingTest.java | 100 -------------- .../schema/FlexibleSchemaHandlingTest.java | 2 +- .../seedshop-avro--package-flink-db.txt | 6 +- .../seedshop-avro--package.txt | 6 +- .../seedshop-avro/orders.table.json | 3 +- .../FlexibleSchemaInferencePreprocessor.java | 2 +- 13 files changed, 78 insertions(+), 212 deletions(-) delete mode 100644 sqrl-planner/src/test/java/com/datasqrl/schema/AvroSchemaHandlingTest.java diff --git a/sqrl-planner/pom.xml b/sqrl-planner/pom.xml index bacb7665a..920ed5b14 100644 --- a/sqrl-planner/pom.xml +++ b/sqrl-planner/pom.xml @@ -155,6 +155,10 @@ org.apache.flink flink-connector-jdbc + + org.apache.flink + flink-avro + \ No newline at end of file diff --git a/sqrl-planner/src/main/java/com/datasqrl/io/schema/avro/AvroSchemaToRelDataTypeFactory.java b/sqrl-planner/src/main/java/com/datasqrl/io/schema/avro/AvroSchemaToRelDataTypeFactory.java index 0a30d25b5..ed85cf57b 100644 --- a/sqrl-planner/src/main/java/com/datasqrl/io/schema/avro/AvroSchemaToRelDataTypeFactory.java +++ b/sqrl-planner/src/main/java/com/datasqrl/io/schema/avro/AvroSchemaToRelDataTypeFactory.java @@ -1,15 +1,19 @@ package com.datasqrl.io.schema.avro; import com.datasqrl.canonicalizer.Name; +import com.datasqrl.config.TableConfig; import com.datasqrl.error.ErrorCollector; import com.datasqrl.io.schema.flexible.converters.SchemaToRelDataTypeFactory; import com.datasqrl.io.tables.TableSchema; import com.google.auto.service.AutoService; import com.google.common.base.Preconditions; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; import org.apache.avro.Schema; import org.apache.calcite.rel.type.RelDataType; @AutoService(SchemaToRelDataTypeFactory.class) +@Slf4j public class AvroSchemaToRelDataTypeFactory implements SchemaToRelDataTypeFactory { @Override @@ -18,12 +22,25 @@ public String getSchemaType() { } @Override - public RelDataType map(TableSchema schema, Name tableName, ErrorCollector errors) { + public RelDataType map(TableSchema schema, TableConfig tableConfig, Name tableName, ErrorCollector errors) { Preconditions.checkArgument(schema instanceof AvroSchemaHolder); Schema avroSchema = ((AvroSchemaHolder)schema).getSchema(); - AvroToRelDataTypeConverter converter = new AvroToRelDataTypeConverter(errors); + + boolean legacyTimestampMapping = getLegacyTimestampMapping(tableConfig); + AvroToRelDataTypeConverter converter = new AvroToRelDataTypeConverter(errors, legacyTimestampMapping); return converter.convert(avroSchema); } + private boolean getLegacyTimestampMapping(TableConfig tableConfig) { + if (tableConfig == null) return true; + Map configMap = tableConfig.getConnectorConfig().toMap(); + Object legacy = configMap.get("timestamp_mapping.legacy"); + if (legacy instanceof Boolean) { + return (Boolean) legacy; + } else if (legacy != null) { + log.warn("Expected boolean type for 'timestamp_mapping.legacy'"); + } + return true; //default value + } } diff --git a/sqrl-planner/src/main/java/com/datasqrl/io/schema/avro/AvroToRelDataTypeConverter.java b/sqrl-planner/src/main/java/com/datasqrl/io/schema/avro/AvroToRelDataTypeConverter.java index e3cb233dd..6591850e1 100644 --- a/sqrl-planner/src/main/java/com/datasqrl/io/schema/avro/AvroToRelDataTypeConverter.java +++ b/sqrl-planner/src/main/java/com/datasqrl/io/schema/avro/AvroToRelDataTypeConverter.java @@ -5,38 +5,44 @@ import com.datasqrl.canonicalizer.NamePath; import com.datasqrl.error.ErrorCode; import com.datasqrl.error.ErrorCollector; -import com.datasqrl.util.StreamUtil; -import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.IdentityHashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import lombok.AllArgsConstructor; import org.apache.avro.LogicalType; -import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; import org.apache.avro.Schema.Type; import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; +import org.apache.flink.table.types.DataType; @AllArgsConstructor public class AvroToRelDataTypeConverter { - private final TypeFactory typeFactory; private final ErrorCollector errors; private final Map typeCache; + private final boolean legacyTimestampMapping; - public AvroToRelDataTypeConverter(ErrorCollector errors) { - this(TypeFactory.getTypeFactory(), errors, new IdentityHashMap<>()); + public AvroToRelDataTypeConverter(ErrorCollector errors, boolean legacyTimestampMapping) { + this(errors, new IdentityHashMap<>(), legacyTimestampMapping); } + //timestamp_mapping.legacy public RelDataType convert(Schema schema) { - return convertAvroSchemaToCalciteType(schema, NamePath.ROOT); + validateSchema(schema, NamePath.ROOT); + + DataType dataType = AvroSchemaConverter.convertToDataType(schema.toString(false), + legacyTimestampMapping); + + TypeFactory typeFactory = TypeFactory.getTypeFactory(); + + return typeFactory.createFieldTypeFromLogicalType( + dataType.getLogicalType()); } - private RelDataType convertAvroSchemaToCalciteType(Schema schema, NamePath path) { + private void validateSchema(Schema schema, NamePath path) { // Check if the schema has already been processed if (typeCache.containsKey(schema)) { throw errors.exception(ErrorCode.SCHEMA_ERROR, "Recursive schema's not yet supported: %s", path); @@ -44,9 +50,6 @@ private RelDataType convertAvroSchemaToCalciteType(Schema schema, NamePath path) switch (schema.getType()) { case UNION: - boolean containsNull = schema.getTypes().stream() - .anyMatch(type -> type.getType() == Type.NULL); - List nonNullTypes = new ArrayList<>(); for (Schema memberSchema : schema.getTypes()) { if (memberSchema.getType() != Type.NULL) { @@ -61,114 +64,51 @@ private RelDataType convertAvroSchemaToCalciteType(Schema schema, NamePath path) } Schema innerSchema = nonNullTypes.get(0); - RelDataType unionType = convertAvroSchemaToCalciteType(innerSchema, path); - if (containsNull) { - unionType = typeFactory.createTypeWithNullability(unionType, true); - } - typeCache.put(schema, unionType); - return unionType; + validateSchema(innerSchema, path); + + typeCache.put(schema, null); + break; case RECORD: // Create a placeholder RelDataType and put it in the cache to handle recursion - List fieldNames = new ArrayList<>(); - List fieldTypes = new ArrayList<>(); - RelDataType placeholder = typeFactory.createStructType(fieldTypes, fieldNames); - typeCache.put(schema, placeholder); + typeCache.put(schema, null); for (Field field : schema.getFields()) { - RelDataType fieldType = convertAvroSchemaToCalciteType(field.schema(), + validateSchema(field.schema(), path.concat(Name.system(field.name()))); - fieldNames.add(field.name()); - fieldTypes.add(fieldType); } - RelDataType recordType = notNull(typeFactory.createStructType(fieldTypes, fieldNames)); - - typeCache.put(schema, recordType); - return recordType; + break; case ARRAY: - RelDataType elementType = convertAvroSchemaToCalciteType(schema.getElementType(), path); - RelDataType arrayType = notNull(typeFactory.createArrayType(elementType, -1)); - typeCache.put(schema, arrayType); - return arrayType; + validateSchema(schema.getElementType(), path); + typeCache.put(schema, null); + break; case MAP: - RelDataType valueType = convertAvroSchemaToCalciteType(schema.getValueType(), path); - RelDataType mapType = notNull(typeFactory.createMapType( - typeFactory.createSqlType(SqlTypeName.VARCHAR), valueType)); - typeCache.put(schema, mapType); - return mapType; + validateSchema(schema.getValueType(), path); + typeCache.put(schema, null); + break; default: // primitives - RelDataType primitiveType = getPrimitive(schema, path); - Preconditions.checkNotNull(primitiveType, "Avro primitive type return null."); - primitiveType = notNull(primitiveType); - typeCache.put(schema, primitiveType); - return primitiveType; + validatePrimitive(schema, path); + typeCache.put(schema, null); + break; } } - private RelDataType getPrimitive(Schema schema, NamePath path) { + private void validatePrimitive(Schema schema, NamePath path) { LogicalType logicalType = schema.getLogicalType(); switch (schema.getType()) { case FIXED: - if (logicalType instanceof LogicalTypes.Decimal) { - LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) logicalType; - int precision = decimalType.getPrecision(); - int scale = decimalType.getScale(); - return typeFactory.createSqlType(SqlTypeName.DECIMAL, precision, scale); - } else { - // Map FIXED type to VARBINARY with length - return typeFactory.createSqlType(SqlTypeName.VARBINARY, schema.getFixedSize()); - } case ENUM: - // Map ENUM to VARCHAR with length of maximum symbol length - List symbols = schema.getEnumSymbols(); - int maxLength = symbols.stream().mapToInt(String::length).max().orElse(1); - return typeFactory.createSqlType(SqlTypeName.VARCHAR, maxLength); case STRING: - return typeFactory.createSqlType(SqlTypeName.VARCHAR); case BYTES: - if (logicalType instanceof LogicalTypes.Decimal) { - LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) logicalType; - int precision = decimalType.getPrecision(); - int scale = decimalType.getScale(); - return typeFactory.createSqlType(SqlTypeName.DECIMAL, precision, scale); - } else { - return typeFactory.createSqlType(SqlTypeName.VARBINARY); - } case INT: - if (logicalType == LogicalTypes.date()) { - return typeFactory.createSqlType(SqlTypeName.DATE); - } else if (logicalType == LogicalTypes.timeMillis()) { - return typeFactory.createSqlType(SqlTypeName.TIME, 3); // milliseconds precision - } else { - return typeFactory.createSqlType(SqlTypeName.INTEGER); - } case LONG: - if (logicalType == LogicalTypes.timestampMillis()) { - return typeFactory.createSqlType(SqlTypeName.TIMESTAMP, 3); // milliseconds precision - } else if (logicalType == LogicalTypes.timestampMicros()) { - return typeFactory.createSqlType(SqlTypeName.TIMESTAMP, 6); // microseconds precision - } else if (logicalType == LogicalTypes.timeMicros()) { - //Note: Flink only supports precision 3, this is converted in the RelDataTypeSystem - // so even though this gets passed 6, the resulting precision will be 3. - return typeFactory.createSqlType(SqlTypeName.TIME, 6); // microseconds precision - } else { - return typeFactory.createSqlType(SqlTypeName.BIGINT); - } case FLOAT: - return typeFactory.createSqlType(SqlTypeName.FLOAT); case DOUBLE: - return typeFactory.createSqlType(SqlTypeName.DOUBLE); case BOOLEAN: - return typeFactory.createSqlType(SqlTypeName.BOOLEAN); case NULL: - return typeFactory.createSqlType(SqlTypeName.NULL); + return; default: throw errors.exception(ErrorCode.SCHEMA_ERROR, "Unrecognized AVRO Type [%s] at: %s", schema.getType(), path); } } - - private RelDataType notNull(RelDataType type) { - return typeFactory.createTypeWithNullability(type, false); - } - } diff --git a/sqrl-planner/src/main/java/com/datasqrl/io/schema/flexible/converters/FlexibleSchemaToRelDataTypeFactory.java b/sqrl-planner/src/main/java/com/datasqrl/io/schema/flexible/converters/FlexibleSchemaToRelDataTypeFactory.java index b22ab4387..966c3925f 100644 --- a/sqrl-planner/src/main/java/com/datasqrl/io/schema/flexible/converters/FlexibleSchemaToRelDataTypeFactory.java +++ b/sqrl-planner/src/main/java/com/datasqrl/io/schema/flexible/converters/FlexibleSchemaToRelDataTypeFactory.java @@ -4,6 +4,7 @@ package com.datasqrl.io.schema.flexible.converters; import com.datasqrl.canonicalizer.Name; +import com.datasqrl.config.TableConfig; import com.datasqrl.error.ErrorCollector; import com.datasqrl.io.tables.TableSchema; import com.datasqrl.io.schema.flexible.FlexibleTableConverter; @@ -22,7 +23,7 @@ public String getSchemaType() { } @Override - public RelDataType map(TableSchema schema, Name tableName, ErrorCollector errors) { + public RelDataType map(TableSchema schema, TableConfig tableConfig, Name tableName, ErrorCollector errors) { Preconditions.checkArgument(schema instanceof FlexibleTableSchemaHolder); FlexibleTableSchemaHolder holder = (FlexibleTableSchemaHolder)schema; FlexibleTableConverter converter = new FlexibleTableConverter(holder.getSchema(), tableName); diff --git a/sqrl-planner/src/main/java/com/datasqrl/io/schema/flexible/converters/SchemaToRelDataTypeFactory.java b/sqrl-planner/src/main/java/com/datasqrl/io/schema/flexible/converters/SchemaToRelDataTypeFactory.java index 93f341c9f..c108c3ab8 100644 --- a/sqrl-planner/src/main/java/com/datasqrl/io/schema/flexible/converters/SchemaToRelDataTypeFactory.java +++ b/sqrl-planner/src/main/java/com/datasqrl/io/schema/flexible/converters/SchemaToRelDataTypeFactory.java @@ -1,6 +1,7 @@ package com.datasqrl.io.schema.flexible.converters; import com.datasqrl.canonicalizer.Name; +import com.datasqrl.config.TableConfig; import com.datasqrl.error.ErrorCollector; import com.datasqrl.io.tables.TableSchema; import com.datasqrl.util.ServiceLoaderDiscovery; @@ -10,7 +11,7 @@ public interface SchemaToRelDataTypeFactory { String getSchemaType(); - RelDataType map(TableSchema schema, Name tableName, ErrorCollector errors); + RelDataType map(TableSchema schema, TableConfig tableConfig, Name tableName, ErrorCollector errors); static SchemaToRelDataTypeFactory load(TableSchema schema) { return ServiceLoaderDiscovery.get(SchemaToRelDataTypeFactory.class, diff --git a/sqrl-planner/src/main/java/com/datasqrl/plan/table/TableConverter.java b/sqrl-planner/src/main/java/com/datasqrl/plan/table/TableConverter.java index d8b562846..edeeaf85a 100644 --- a/sqrl-planner/src/main/java/com/datasqrl/plan/table/TableConverter.java +++ b/sqrl-planner/src/main/java/com/datasqrl/plan/table/TableConverter.java @@ -52,7 +52,7 @@ public SourceTableDefinition sourceToTable( dataType = ((RelDataTypeTableSchema) tableSchema).getRelDataType(); } else { dataType = SchemaToRelDataTypeFactory.load(tableSchema) - .map(tableSchema, tableName, errors); + .map(tableSchema, tableConfig, tableName, errors); } if (dataType==null) { throw errors.exception(ErrorCode.SCHEMA_ERROR, "Could not convert schema for table: %s", tableName); diff --git a/sqrl-planner/src/test/java/com/datasqrl/io/schema/avro/AvroToRelDataTypeConverterTest.java b/sqrl-planner/src/test/java/com/datasqrl/io/schema/avro/AvroToRelDataTypeConverterTest.java index 41354c972..f49243452 100644 --- a/sqrl-planner/src/test/java/com/datasqrl/io/schema/avro/AvroToRelDataTypeConverterTest.java +++ b/sqrl-planner/src/test/java/com/datasqrl/io/schema/avro/AvroToRelDataTypeConverterTest.java @@ -26,7 +26,7 @@ public class AvroToRelDataTypeConverterTest { @BeforeEach public void setUp() { errors = ErrorCollector.root(); - converter = new AvroToRelDataTypeConverter(errors); + converter = new AvroToRelDataTypeConverter(errors, true); } @Test @@ -102,7 +102,7 @@ public void testLogicalTypes() { LogicalTypes.timeMillis().addToSchema(timeMillisSchema); RelDataType timeMillisType = converter.convert(timeMillisSchema); assertEquals(SqlTypeName.TIME, timeMillisType.getSqlTypeName()); - assertEquals(3, timeMillisType.getPrecision()); + assertEquals(0, timeMillisType.getPrecision()); // Time (micros) Schema timeMicrosSchema = Schema.create(Type.LONG); @@ -111,7 +111,7 @@ public void testLogicalTypes() { assertEquals(SqlTypeName.TIME, timeMicrosType.getSqlTypeName()); // Note: Flink only supports precision 3, this is converted in the RelDataTypeSystem // so even though this gets passed 6, the resulting precision will be 3. - assertEquals(3, timeMicrosType.getPrecision()); + assertEquals(0, timeMicrosType.getPrecision()); // Timestamp (millis) Schema timestampMillisSchema = Schema.create(Type.LONG); @@ -149,6 +149,7 @@ public void testUnionTypes() { Arrays.asList(Schema.create(Type.INT), Schema.create(Type.STRING))); try { RelDataType invalidUnionType = converter.convert(invalidUnionSchema); + System.out.println(invalidUnionType); fail("Expected failure"); } catch (Exception e) {} assertTrue(errors.hasErrors()); @@ -215,7 +216,7 @@ public void testEnumType() { RelDataType enumType = converter.convert(enumSchema); assertEquals(SqlTypeName.VARCHAR, enumType.getSqlTypeName()); int expectedLength = symbols.stream().mapToInt(String::length).max().orElse(1); - assertEquals(expectedLength, enumType.getPrecision()); +// assertEquals(expectedLength, enumType.getPrecision()); } @Test @@ -276,6 +277,7 @@ public void testInvalidSchema() { RelDataType type = null; try { type = converter.convert(invalidUnionSchema); + System.out.println(type); fail("Expected failure"); } catch (Exception e) {} assertNull(type); diff --git a/sqrl-planner/src/test/java/com/datasqrl/schema/AvroSchemaHandlingTest.java b/sqrl-planner/src/test/java/com/datasqrl/schema/AvroSchemaHandlingTest.java deleted file mode 100644 index e0bebef9e..000000000 --- a/sqrl-planner/src/test/java/com/datasqrl/schema/AvroSchemaHandlingTest.java +++ /dev/null @@ -1,100 +0,0 @@ -package com.datasqrl.schema; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; - -import com.datasqrl.canonicalizer.Name; -import com.datasqrl.error.ErrorCollector; -import com.datasqrl.io.schema.avro.AvroSchemaHolder; -import com.datasqrl.io.schema.avro.AvroTableSchemaFactory; -import com.datasqrl.io.schema.avro.AvroSchemaToRelDataTypeFactory; -import com.datasqrl.util.SnapshotTest; -import com.google.common.collect.ImmutableMap; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.Map; -import java.util.Optional; -import java.util.stream.Stream; -import org.apache.calcite.rel.type.RelDataType; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.extension.ExtensionContext; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.ArgumentsProvider; -import org.junit.jupiter.params.provider.ArgumentsSource; - -@Disabled -public class AvroSchemaHandlingTest { - - private static Path EXAMPLES_PATH = Path.of("..", "..", "sqrl-examples"); - private static final Path RESOURCE_DIR = Paths.get("src", "test", "resources"); - - private static final Map SYSTEM_SETTINGS = ImmutableMap.of("kafka_servers","111.111.0.1", - "something","earliest-offset"); - public static final String SYSTEM_PREFIX = "datasqrl."; - - - @ParameterizedTest - @ArgumentsSource(SchemaProvider.class) - public void testAvroConversion(Path schemaPath, Path configPath, String name) throws Exception { - String schemaDef = Files.readString(schemaPath); - AvroTableSchemaFactory schemaFactory = new AvroTableSchemaFactory(); - assertEquals("avro", schemaFactory.getType()); - ErrorCollector errors = ErrorCollector.root(); - AvroSchemaHolder schema = schemaFactory.create(schemaDef, Optional.of(schemaPath), errors); - assertNotNull(schema.getSchema()); - assertEquals(schemaDef, schema.getSchemaDefinition()); - assertEquals("avro", schema.getSchemaType()); - - SnapshotTest.Snapshot snapshot = SnapshotTest.Snapshot.of(getClass(), name); - Name tblName = Name.system(name); - AvroSchemaToRelDataTypeFactory converter = new AvroSchemaToRelDataTypeFactory(); - RelDataType type = converter.map(schema, tblName, errors); - snapshot.addContent(type.getFullTypeString(), "relType"); - assertFalse(errors.hasErrors(), errors.toString()); - - setSystemSettings(); - //Flink Schema -// UniversalTable2FlinkSchema conv1 = new UniversalTable2FlinkSchema(); -// snapshot.addContent(conv1.convertSchema(type).toString(), "flinkSchema"); - - //TODO: Move this to a generic (i.e. not schema format specific) test case -// SerializableSchema serializableSchema = SqrlToFlinkExecutablePlan.convertSchema(utb, -// "_source_time", null, WaterMarkType.COLUMN_BY_NAME); -// TableDescriptorSourceFactory sourceFactory = new KafkaTableSourceFactory(); -// TableConfig tableConfig = TableConfig.load(configPath, tblName, errors); -// FlinkSourceFactoryContext factoryContext = new FlinkSourceFactoryContext(null, name, -// tableConfig.serialize(), -// tableConfig.getFormat(), UUID.randomUUID()); -// TableDescriptor descriptor = FlinkEnvironmentBuilder.getTableDescriptor(sourceFactory, -// factoryContext, serializableSchema); -// snapshot.addContent(descriptor.toString(), "descriptor"); - - - snapshot.createOrValidate(); - } - - private void setSystemSettings() { - SYSTEM_SETTINGS.forEach((k,v) -> - System.setProperty(SYSTEM_PREFIX+k,v)); - } - - static class SchemaProvider implements ArgumentsProvider { - - @Override - public Stream provideArguments(ExtensionContext extensionContext) - throws Exception { - return Stream.of(Arguments.of( - EXAMPLES_PATH.resolve("retail/ecommerce-avro/orders.avsc"), - EXAMPLES_PATH.resolve("retail/ecommerce-avro/orders.table.json"), - "orders"), - Arguments.of( - EXAMPLES_PATH.resolve("retail/ecommerce-avro/orders.avsc"), - RESOURCE_DIR.resolve("schema/avro/orders-confluent.table.json"), - "ordersconfluent")); - } - } - -} diff --git a/sqrl-planner/src/test/java/com/datasqrl/schema/FlexibleSchemaHandlingTest.java b/sqrl-planner/src/test/java/com/datasqrl/schema/FlexibleSchemaHandlingTest.java index ff36c0611..992a6e879 100644 --- a/sqrl-planner/src/test/java/com/datasqrl/schema/FlexibleSchemaHandlingTest.java +++ b/sqrl-planner/src/test/java/com/datasqrl/schema/FlexibleSchemaHandlingTest.java @@ -60,7 +60,7 @@ public void conversionTest(InputSchema inputSchema, SchemaConverterTestCase< ErrorCollector errors = ErrorCollector.root(); FlexibleTableSchemaHolder tableSchema = new FlexibleTableSchemaHolder(table); RelDataType dataType = SchemaToRelDataTypeFactory.load(tableSchema) - .map(tableSchema, tableName, errors); + .map(tableSchema, null, tableName, errors); assertFalse(errors.hasErrors(), errors.toString()); if (alias.isPresent()) { continue; diff --git a/sqrl-testing/sqrl-integration-tests/src/test/resources/snapshots/com/datasqrl/UseCaseCompileTest/seedshop-avro--package-flink-db.txt b/sqrl-testing/sqrl-integration-tests/src/test/resources/snapshots/com/datasqrl/UseCaseCompileTest/seedshop-avro--package-flink-db.txt index 616bdddcd..2f36c7b5d 100644 --- a/sqrl-testing/sqrl-integration-tests/src/test/resources/snapshots/com/datasqrl/UseCaseCompileTest/seedshop-avro--package-flink-db.txt +++ b/sqrl-testing/sqrl-integration-tests/src/test/resources/snapshots/com/datasqrl/UseCaseCompileTest/seedshop-avro--package-flink-db.txt @@ -15,7 +15,7 @@ LogicalAggregate(group=[{0}], number=[COUNT()], volume=[SUM($1)]) hints[TumbleAg LogicalProject(timeSec=[endOfSecond($4)], quantity=[$6], id=[$0], time=[$2], productid=[$5], _source_time=[$4]) LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}]) LogicalTableScan(table=[[orders_2]]) - LogicalTableFunctionScan(invocation=[Orders.items($cor0.items)], rowType=[RecordType(INTEGER productid, INTEGER quantity, DOUBLE unit_price, DOUBLE discount)], elementType=[class [Ljava.lang.Object;]) + LogicalTableFunctionScan(invocation=[Orders.items($cor0.items)], rowType=[RecordType:peek_no_expand(INTEGER productid, INTEGER quantity, DOUBLE unit_price, DOUBLE discount)], elementType=[class [Ljava.lang.Object;]) === Orders ID: orders_2 @@ -27,7 +27,7 @@ Schema: - id: BIGINT NOT NULL - customerid: BIGINT NOT NULL - time: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL - - items: RecordType(INTEGER NOT NULL productid, INTEGER NOT NULL quantity, DOUBLE NOT NULL unit_price, DOUBLE discount) NOT NULL ARRAY NOT NULL + - items: RecordType:peek_no_expand(INTEGER NOT NULL productid, INTEGER NOT NULL quantity, DOUBLE NOT NULL unit_price, DOUBLE discount) NOT NULL ARRAY NOT NULL - _source_time: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL Plan: LogicalTableScan(table=[[orders_1]]) @@ -50,7 +50,7 @@ LogicalTableScan(table=[[orders_1]]) "CREATE TEMPORARY FUNCTION IF NOT EXISTS `endofday` AS 'com.datasqrl.time.EndOfDay' LANGUAGE JAVA;", "CREATE TEMPORARY FUNCTION IF NOT EXISTS `endofhour` AS 'com.datasqrl.time.EndOfHour' LANGUAGE JAVA;", "CREATE TEMPORARY FUNCTION IF NOT EXISTS `endofyear` AS 'com.datasqrl.time.EndOfYear' LANGUAGE JAVA;", - "CREATE TEMPORARY TABLE `orders_1` (\n `id` BIGINT NOT NULL,\n `customerid` BIGINT NOT NULL,\n `time` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n `items` ROW(`productid` INTEGER NOT NULL, `quantity` INTEGER NOT NULL, `unit_price` DOUBLE NOT NULL, `discount` DOUBLE) NOT NULL ARRAY NOT NULL,\n `_source_time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL METADATA FROM 'timestamp',\n WATERMARK FOR `_source_time` AS `_source_time` - INTERVAL '0.0' SECOND\n) WITH (\n 'format' = 'avro',\n 'properties.bootstrap.servers' = '${PROPERTIES_BOOTSTRAP_SERVERS}',\n 'properties.group.id' = 'datasqrl-orders',\n 'topic' = '${sqrl:topic}',\n 'connector' = 'kafka'\n);", + "CREATE TEMPORARY TABLE `orders_1` (\n `id` BIGINT NOT NULL,\n `customerid` BIGINT NOT NULL,\n `time` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n `items` ROW(`productid` INTEGER NOT NULL, `quantity` INTEGER NOT NULL, `unit_price` DOUBLE NOT NULL, `discount` DOUBLE) NOT NULL ARRAY NOT NULL,\n `_source_time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL METADATA FROM 'timestamp',\n WATERMARK FOR `_source_time` AS `_source_time` - INTERVAL '0.0' SECOND\n) WITH (\n 'properties.bootstrap.servers' = '${PROPERTIES_BOOTSTRAP_SERVERS}',\n 'connector' = 'kafka',\n 'format' = 'avro',\n 'properties.group.id' = 'datasqrl-orders',\n 'topic' = '${sqrl:topic}',\n 'timestamp_mapping.legacy' = 'false'\n);", "CREATE TEMPORARY TABLE `ordercount_1` (\n `timeSec` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,\n `number` BIGINT NOT NULL,\n `volume` INTEGER NOT NULL,\n PRIMARY KEY (`timeSec`) NOT ENFORCED\n) WITH (\n 'password' = '${JDBC_PASSWORD}',\n 'connector' = 'jdbc-sqrl',\n 'driver' = 'org.postgresql.Driver',\n 'table-name' = 'ordercount_1',\n 'url' = '${JDBC_URL}',\n 'username' = '${JDBC_USERNAME}'\n);", "CREATE TEMPORARY TABLE `orders_2` (\n `id` BIGINT NOT NULL,\n `customerid` BIGINT NOT NULL,\n `time` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n `items` RAW('com.datasqrl.json.FlinkJsonType', 'ADFjb20uZGF0YXNxcmwuanNvbi5GbGlua0pzb25UeXBlU2VyaWFsaXplclNuYXBzaG90AAAAAQApY29tLmRhdGFzcXJsLmpzb24uRmxpbmtKc29uVHlwZVNlcmlhbGl6ZXI='),\n `_source_time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,\n PRIMARY KEY (`id`, `time`) NOT ENFORCED\n) WITH (\n 'password' = '${JDBC_PASSWORD}',\n 'connector' = 'jdbc-sqrl',\n 'driver' = 'org.postgresql.Driver',\n 'table-name' = 'orders_2',\n 'url' = '${JDBC_URL}',\n 'username' = '${JDBC_USERNAME}'\n);", "CREATE VIEW `table$1`\nAS\nSELECT ENDOFSECOND(`$cor0`.`_source_time`) AS `timeSec`, `t00`.`quantity`, `$cor0`.`id`, `$cor0`.`time`, `t00`.`productid`, `$cor0`.`_source_time`\nFROM `orders_1` AS `$cor0`,\n UNNEST(`$cor0`.`items`) AS `t00` (`productid`, `quantity`, `unit_price`, `discount`);", diff --git a/sqrl-testing/sqrl-integration-tests/src/test/resources/snapshots/com/datasqrl/UseCaseCompileTest/seedshop-avro--package.txt b/sqrl-testing/sqrl-integration-tests/src/test/resources/snapshots/com/datasqrl/UseCaseCompileTest/seedshop-avro--package.txt index c745899eb..dbb20f720 100644 --- a/sqrl-testing/sqrl-integration-tests/src/test/resources/snapshots/com/datasqrl/UseCaseCompileTest/seedshop-avro--package.txt +++ b/sqrl-testing/sqrl-integration-tests/src/test/resources/snapshots/com/datasqrl/UseCaseCompileTest/seedshop-avro--package.txt @@ -15,7 +15,7 @@ LogicalAggregate(group=[{0}], number=[COUNT()], volume=[SUM($1)]) hints[TumbleAg LogicalProject(timeSec=[endOfSecond($4)], quantity=[$6], id=[$0], time=[$2], productid=[$5], _source_time=[$4]) LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}]) LogicalTableScan(table=[[orders_2]]) - LogicalTableFunctionScan(invocation=[Orders.items($cor0.items)], rowType=[RecordType(INTEGER productid, INTEGER quantity, DOUBLE unit_price, DOUBLE discount)], elementType=[class [Ljava.lang.Object;]) + LogicalTableFunctionScan(invocation=[Orders.items($cor0.items)], rowType=[RecordType:peek_no_expand(INTEGER productid, INTEGER quantity, DOUBLE unit_price, DOUBLE discount)], elementType=[class [Ljava.lang.Object;]) === Orders ID: orders_2 @@ -27,7 +27,7 @@ Schema: - id: BIGINT NOT NULL - customerid: BIGINT NOT NULL - time: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL - - items: RecordType(INTEGER NOT NULL productid, INTEGER NOT NULL quantity, DOUBLE NOT NULL unit_price, DOUBLE discount) NOT NULL ARRAY NOT NULL + - items: RecordType:peek_no_expand(INTEGER NOT NULL productid, INTEGER NOT NULL quantity, DOUBLE NOT NULL unit_price, DOUBLE discount) NOT NULL ARRAY NOT NULL - _source_time: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL Plan: LogicalTableScan(table=[[orders_1]]) @@ -50,7 +50,7 @@ LogicalTableScan(table=[[orders_1]]) "CREATE TEMPORARY FUNCTION IF NOT EXISTS `endofday` AS 'com.datasqrl.time.EndOfDay' LANGUAGE JAVA;", "CREATE TEMPORARY FUNCTION IF NOT EXISTS `endofhour` AS 'com.datasqrl.time.EndOfHour' LANGUAGE JAVA;", "CREATE TEMPORARY FUNCTION IF NOT EXISTS `endofyear` AS 'com.datasqrl.time.EndOfYear' LANGUAGE JAVA;", - "CREATE TEMPORARY TABLE `orders_1` (\n `id` BIGINT NOT NULL,\n `customerid` BIGINT NOT NULL,\n `time` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n `items` ROW(`productid` INTEGER NOT NULL, `quantity` INTEGER NOT NULL, `unit_price` DOUBLE NOT NULL, `discount` DOUBLE) NOT NULL ARRAY NOT NULL,\n `_source_time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL METADATA FROM 'timestamp',\n WATERMARK FOR `_source_time` AS `_source_time` - INTERVAL '0.0' SECOND\n) WITH (\n 'format' = 'avro',\n 'properties.bootstrap.servers' = '${PROPERTIES_BOOTSTRAP_SERVERS}',\n 'properties.group.id' = 'datasqrl-orders',\n 'topic' = '${sqrl:topic}',\n 'connector' = 'kafka'\n);", + "CREATE TEMPORARY TABLE `orders_1` (\n `id` BIGINT NOT NULL,\n `customerid` BIGINT NOT NULL,\n `time` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n `items` ROW(`productid` INTEGER NOT NULL, `quantity` INTEGER NOT NULL, `unit_price` DOUBLE NOT NULL, `discount` DOUBLE) NOT NULL ARRAY NOT NULL,\n `_source_time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL METADATA FROM 'timestamp',\n WATERMARK FOR `_source_time` AS `_source_time` - INTERVAL '0.0' SECOND\n) WITH (\n 'properties.bootstrap.servers' = '${PROPERTIES_BOOTSTRAP_SERVERS}',\n 'connector' = 'kafka',\n 'format' = 'avro',\n 'properties.group.id' = 'datasqrl-orders',\n 'topic' = '${sqrl:topic}',\n 'timestamp_mapping.legacy' = 'false'\n);", "CREATE TEMPORARY TABLE `ordercount_1` (\n `timeSec` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,\n `number` BIGINT NOT NULL,\n `volume` INTEGER NOT NULL,\n PRIMARY KEY (`timeSec`) NOT ENFORCED\n) WITH (\n 'password' = '${JDBC_PASSWORD}',\n 'connector' = 'jdbc-sqrl',\n 'driver' = 'org.postgresql.Driver',\n 'table-name' = 'ordercount_1',\n 'url' = '${JDBC_URL}',\n 'username' = '${JDBC_USERNAME}'\n);", "CREATE TEMPORARY TABLE `orders_2` (\n `id` BIGINT NOT NULL,\n `customerid` BIGINT NOT NULL,\n `time` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n `items` RAW('com.datasqrl.json.FlinkJsonType', 'ADFjb20uZGF0YXNxcmwuanNvbi5GbGlua0pzb25UeXBlU2VyaWFsaXplclNuYXBzaG90AAAAAQApY29tLmRhdGFzcXJsLmpzb24uRmxpbmtKc29uVHlwZVNlcmlhbGl6ZXI='),\n `_source_time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,\n PRIMARY KEY (`id`, `time`) NOT ENFORCED\n) WITH (\n 'password' = '${JDBC_PASSWORD}',\n 'connector' = 'jdbc-sqrl',\n 'driver' = 'org.postgresql.Driver',\n 'table-name' = 'orders_2',\n 'url' = '${JDBC_URL}',\n 'username' = '${JDBC_USERNAME}'\n);", "CREATE VIEW `table$1`\nAS\nSELECT ENDOFSECOND(`$cor0`.`_source_time`) AS `timeSec`, `t00`.`quantity`, `$cor0`.`id`, `$cor0`.`time`, `t00`.`productid`, `$cor0`.`_source_time`\nFROM `orders_1` AS `$cor0`,\n UNNEST(`$cor0`.`items`) AS `t00` (`productid`, `quantity`, `unit_price`, `discount`);", diff --git a/sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/seedshop-avro/seedshop-avro/orders.table.json b/sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/seedshop-avro/seedshop-avro/orders.table.json index 3f598bf1e..707f40aa6 100644 --- a/sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/seedshop-avro/seedshop-avro/orders.table.json +++ b/sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/seedshop-avro/seedshop-avro/orders.table.json @@ -5,7 +5,8 @@ "properties.bootstrap.servers": "${PROPERTIES_BOOTSTRAP_SERVERS}", "properties.group.id": "datasqrl-orders", "topic": "${sqrl:topic}", - "connector" : "kafka" + "connector" : "kafka", + "timestamp_mapping.legacy" : false }, "table" : { "type" : "source", diff --git a/sqrl-tools/sqrl-discovery/src/main/java/com/datasqrl/discovery/preprocessor/FlexibleSchemaInferencePreprocessor.java b/sqrl-tools/sqrl-discovery/src/main/java/com/datasqrl/discovery/preprocessor/FlexibleSchemaInferencePreprocessor.java index a3a18919b..d3dfdc776 100644 --- a/sqrl-tools/sqrl-discovery/src/main/java/com/datasqrl/discovery/preprocessor/FlexibleSchemaInferencePreprocessor.java +++ b/sqrl-tools/sqrl-discovery/src/main/java/com/datasqrl/discovery/preprocessor/FlexibleSchemaInferencePreprocessor.java @@ -130,7 +130,7 @@ public void discoverFile(Path file, ProcessorContext processorContext, ErrorColl FlexibleTableSchemaHolder schemaHolder = new FlexibleTableSchemaHolder(schema); //4. Infer primary key RelDataType rowType = SchemaToRelDataTypeFactory.load(schemaHolder) - .map(schemaHolder, tableName, errors); + .map(schemaHolder, null, tableName, errors); //We use a conservative method where each simple column is a primary key column String[] primaryKey = rowType.getFieldList().stream().filter(f -> !f.getType().isNullable() && CalciteUtil.isPotentialPrimaryKeyType(f.getType())) .map(RelDataTypeField::getName).toArray(String[]::new); From 8db68ce74e5a224414b16e6b976abbcf6077d615 Mon Sep 17 00:00:00 2001 From: Daniel Henneberger Date: Fri, 13 Sep 2024 14:05:27 -0700 Subject: [PATCH 4/5] Code cleanup Signed-off-by: Daniel Henneberger --- .../avro/AvroToRelDataTypeConverter.java | 22 +++++-------------- 1 file changed, 6 insertions(+), 16 deletions(-) diff --git a/sqrl-planner/src/main/java/com/datasqrl/io/schema/avro/AvroToRelDataTypeConverter.java b/sqrl-planner/src/main/java/com/datasqrl/io/schema/avro/AvroToRelDataTypeConverter.java index 6591850e1..c0a52cbb7 100644 --- a/sqrl-planner/src/main/java/com/datasqrl/io/schema/avro/AvroToRelDataTypeConverter.java +++ b/sqrl-planner/src/main/java/com/datasqrl/io/schema/avro/AvroToRelDataTypeConverter.java @@ -6,11 +6,11 @@ import com.datasqrl.error.ErrorCode; import com.datasqrl.error.ErrorCollector; import java.util.ArrayList; +import java.util.Collections; import java.util.IdentityHashMap; import java.util.List; -import java.util.Map; +import java.util.Set; import lombok.AllArgsConstructor; -import org.apache.avro.LogicalType; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; import org.apache.avro.Schema.Type; @@ -22,14 +22,13 @@ public class AvroToRelDataTypeConverter { private final ErrorCollector errors; - private final Map typeCache; + private final Set processedSchemas; private final boolean legacyTimestampMapping; public AvroToRelDataTypeConverter(ErrorCollector errors, boolean legacyTimestampMapping) { - this(errors, new IdentityHashMap<>(), legacyTimestampMapping); + this(errors, Collections.newSetFromMap(new IdentityHashMap<>()), legacyTimestampMapping); } - //timestamp_mapping.legacy public RelDataType convert(Schema schema) { validateSchema(schema, NamePath.ROOT); @@ -44,8 +43,8 @@ public RelDataType convert(Schema schema) { private void validateSchema(Schema schema, NamePath path) { // Check if the schema has already been processed - if (typeCache.containsKey(schema)) { - throw errors.exception(ErrorCode.SCHEMA_ERROR, "Recursive schema's not yet supported: %s", path); + if (!processedSchemas.add(schema)) { + throw errors.exception(ErrorCode.SCHEMA_ERROR, "Recursive schemas are not supported: %s", path); } switch (schema.getType()) { @@ -65,13 +64,8 @@ private void validateSchema(Schema schema, NamePath path) { Schema innerSchema = nonNullTypes.get(0); validateSchema(innerSchema, path); - - typeCache.put(schema, null); break; case RECORD: - // Create a placeholder RelDataType and put it in the cache to handle recursion - typeCache.put(schema, null); - for (Field field : schema.getFields()) { validateSchema(field.schema(), path.concat(Name.system(field.name()))); @@ -79,21 +73,17 @@ private void validateSchema(Schema schema, NamePath path) { break; case ARRAY: validateSchema(schema.getElementType(), path); - typeCache.put(schema, null); break; case MAP: validateSchema(schema.getValueType(), path); - typeCache.put(schema, null); break; default: // primitives validatePrimitive(schema, path); - typeCache.put(schema, null); break; } } private void validatePrimitive(Schema schema, NamePath path) { - LogicalType logicalType = schema.getLogicalType(); switch (schema.getType()) { case FIXED: case ENUM: From fe52e187076a86f81e7265403d5fa3924a892923 Mon Sep 17 00:00:00 2001 From: Daniel Henneberger Date: Fri, 13 Sep 2024 14:10:14 -0700 Subject: [PATCH 5/5] Add local timestamp test Signed-off-by: Daniel Henneberger --- .../avro/AvroToRelDataTypeConverterTest.java | 24 ++++++++++++++----- 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/sqrl-planner/src/test/java/com/datasqrl/io/schema/avro/AvroToRelDataTypeConverterTest.java b/sqrl-planner/src/test/java/com/datasqrl/io/schema/avro/AvroToRelDataTypeConverterTest.java index f49243452..50fc414b4 100644 --- a/sqrl-planner/src/test/java/com/datasqrl/io/schema/avro/AvroToRelDataTypeConverterTest.java +++ b/sqrl-planner/src/test/java/com/datasqrl/io/schema/avro/AvroToRelDataTypeConverterTest.java @@ -26,7 +26,7 @@ public class AvroToRelDataTypeConverterTest { @BeforeEach public void setUp() { errors = ErrorCollector.root(); - converter = new AvroToRelDataTypeConverter(errors, true); + converter = new AvroToRelDataTypeConverter(errors, false); } @Test @@ -109,24 +109,36 @@ public void testLogicalTypes() { LogicalTypes.timeMicros().addToSchema(timeMicrosSchema); RelDataType timeMicrosType = converter.convert(timeMicrosSchema); assertEquals(SqlTypeName.TIME, timeMicrosType.getSqlTypeName()); - // Note: Flink only supports precision 3, this is converted in the RelDataTypeSystem - // so even though this gets passed 6, the resulting precision will be 3. assertEquals(0, timeMicrosType.getPrecision()); // Timestamp (millis) Schema timestampMillisSchema = Schema.create(Type.LONG); LogicalTypes.timestampMillis().addToSchema(timestampMillisSchema); RelDataType timestampMillisType = converter.convert(timestampMillisSchema); - assertEquals(SqlTypeName.TIMESTAMP, timestampMillisType.getSqlTypeName()); + assertEquals(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, timestampMillisType.getSqlTypeName()); assertEquals(3, timestampMillisType.getPrecision()); // Timestamp (micros) Schema timestampMicrosSchema = Schema.create(Type.LONG); LogicalTypes.timestampMicros().addToSchema(timestampMicrosSchema); RelDataType timestampMicrosType = converter.convert(timestampMicrosSchema); - assertEquals(SqlTypeName.TIMESTAMP, timestampMicrosType.getSqlTypeName()); + assertEquals(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, timestampMicrosType.getSqlTypeName()); assertEquals(6, timestampMicrosType.getPrecision()); + // Local Timestamp (millis) + Schema timestampLocalMillisSchema = Schema.create(Type.LONG); + LogicalTypes.localTimestampMillis().addToSchema(timestampLocalMillisSchema); + RelDataType timestampLocalMillisType = converter.convert(timestampLocalMillisSchema); + assertEquals(SqlTypeName.TIMESTAMP, timestampLocalMillisType.getSqlTypeName()); + assertEquals(3, timestampLocalMillisType.getPrecision()); + + // Local Timestamp (micros) + Schema timestampLocalMicrosSchema = Schema.create(Type.LONG); + LogicalTypes.localTimestampMicros().addToSchema(timestampLocalMicrosSchema); + RelDataType timestampLocalMicrosType = converter.convert(timestampLocalMicrosSchema); + assertEquals(SqlTypeName.TIMESTAMP, timestampLocalMicrosType.getSqlTypeName()); + assertEquals(6, timestampLocalMicrosType.getPrecision()); + // UUID Schema uuidSchema = Schema.create(Type.STRING); LogicalTypes.uuid().addToSchema(uuidSchema); @@ -313,7 +325,7 @@ public void testNestedLogicalTypes() { RelDataType arrayType = converter.convert(arraySchema); assertEquals(SqlTypeName.ARRAY, arrayType.getSqlTypeName()); RelDataType elementType = arrayType.getComponentType(); - assertEquals(SqlTypeName.TIMESTAMP, elementType.getSqlTypeName()); + assertEquals(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, elementType.getSqlTypeName()); assertEquals(6, elementType.getPrecision()); }