diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroColumnDecoder.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroColumnDecoder.java index 73081f8948a51..1672d5f144817 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroColumnDecoder.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroColumnDecoder.java @@ -54,6 +54,7 @@ import io.trino.spi.type.Timestamps; import io.trino.spi.type.TinyintType; import io.trino.spi.type.Type; +import io.trino.spi.type.UuidType; import io.trino.spi.type.VarbinaryType; import io.trino.spi.type.VarcharType; import java.math.BigInteger; @@ -61,6 +62,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import org.apache.avro.generic.GenericEnumSymbol; import org.apache.avro.generic.GenericFixed; import org.apache.avro.generic.GenericRecord; @@ -87,7 +89,8 @@ public class PulsarAvroColumnDecoder { TimestampType.TIMESTAMP_MILLIS, DateType.DATE, TimeType.TIME_MILLIS, - VarbinaryType.VARBINARY); + VarbinaryType.VARBINARY, + UuidType.UUID); private final Type columnType; private final String columnMapping; @@ -255,6 +258,10 @@ private static Slice getSlice(Object value, Type type, String columnName) { } } + if (type instanceof UuidType) { + return UuidType.javaUuidToTrinoUuid(UUID.fromString(value.toString())); + } + throw new TrinoException(DECODER_CONVERSION_NOT_SUPPORTED, format("cannot decode object of '%s' as '%s' for column '%s'", value.getClass(), type, columnName)); diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroRowDecoderFactory.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroRowDecoderFactory.java index 3072bf9441b2c..e6eb6b7f2f947 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroRowDecoderFactory.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroRowDecoderFactory.java @@ -44,6 +44,7 @@ import io.trino.spi.type.TypeManager; import io.trino.spi.type.TypeSignature; import io.trino.spi.type.TypeSignatureParameter; +import io.trino.spi.type.UuidType; import io.trino.spi.type.VarbinaryType; import io.trino.spi.type.VarcharType; import java.util.List; @@ -121,6 +122,10 @@ private Type parseAvroPrestoType(String fieldName, Schema schema) { LogicalType logicalType = schema.getLogicalType(); switch (type) { case STRING: + if (logicalType != null && logicalType.equals(LogicalTypes.uuid())) { + return UuidType.UUID; + } + return createUnboundedVarcharType(); case ENUM: return createUnboundedVarcharType(); case NULL: diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonFieldDecoder.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonFieldDecoder.java index 905e3bd6becb4..8e744e3b1229c 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonFieldDecoder.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonFieldDecoder.java @@ -58,6 +58,7 @@ import io.trino.spi.type.Timestamps; import io.trino.spi.type.TinyintType; import io.trino.spi.type.Type; +import io.trino.spi.type.UuidType; import io.trino.spi.type.VarbinaryType; import io.trino.spi.type.VarcharType; import java.util.Iterator; @@ -126,7 +127,8 @@ private boolean isSupportedType(Type type) { TimestampType.TIMESTAMP_MILLIS, DateType.DATE, TimeType.TIME_MILLIS, - RealType.REAL + RealType.REAL, + UuidType.UUID ).contains(type)) { return true; } diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java index 0d5cc2d262dfe..737eb608d82d6 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java @@ -44,6 +44,7 @@ import io.trino.spi.type.TypeManager; import io.trino.spi.type.TypeSignature; import io.trino.spi.type.TypeSignatureParameter; +import io.trino.spi.type.UuidType; import io.trino.spi.type.VarbinaryType; import io.trino.spi.type.VarcharType; import java.util.List; @@ -121,6 +122,10 @@ private Type parseJsonPrestoType(String fieldName, Schema schema) { LogicalType logicalType = schema.getLogicalType(); switch (type) { case STRING: + if (logicalType != null && logicalType.equals(LogicalTypes.uuid())) { + return UuidType.UUID; + } + return createUnboundedVarcharType(); case ENUM: return createUnboundedVarcharType(); case NULL: diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestMessage.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestMessage.java index 4561282c67196..0dec76b3d4dec 100644 --- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestMessage.java +++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestMessage.java @@ -19,6 +19,7 @@ package org.apache.pulsar.sql.presto.decoder; import java.math.BigDecimal; +import java.util.UUID; import lombok.Data; import java.util.List; @@ -55,6 +56,9 @@ public static enum TestEnum { public Map mapField; public CompositeRow compositeRow; + @org.apache.avro.reflect.AvroSchema("{\"type\":\"string\",\"logicalType\":\"uuid\"}") + public UUID uuidField; + public static class TestRow { public String stringField; public int intField; diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java index c4e7009b9465b..5f9df96619b9f 100644 --- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java +++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java @@ -44,6 +44,7 @@ import io.trino.spi.type.Timestamps; import io.trino.spi.type.Type; import io.trino.spi.type.TypeSignatureParameter; +import io.trino.spi.type.UuidType; import io.trino.spi.type.VarcharType; import java.math.BigDecimal; import java.time.LocalDate; @@ -55,6 +56,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.UUID; import org.apache.pulsar.client.impl.schema.AvroSchema; import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord; import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema; @@ -90,6 +92,7 @@ public void testPrimitiveType() { message.longField = 222L; message.timestampField = System.currentTimeMillis(); message.enumField = DecoderTestMessage.TestEnum.TEST_ENUM_1; + message.uuidField = UUID.randomUUID(); LocalTime now = LocalTime.now(ZoneId.systemDefault()); message.timeField = now.toSecondOfDay() * 1000; @@ -137,6 +140,10 @@ public void testPrimitiveType() { PulsarColumnHandle timeFieldColumnHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(), "timeField", TIME_MILLIS, false, false, "timeField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE); checkValue(decodedRow, timeFieldColumnHandle, (long) message.timeField * Timestamps.PICOSECONDS_PER_MILLISECOND); + + PulsarColumnHandle uuidHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(), + "uuidField", UuidType.UUID, false, false, "uuidField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE); + checkValue(decodedRow, uuidHandle, UuidType.javaUuidToTrinoUuid(message.uuidField)); } @Test diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java index 4afad9b318fc5..32e71a53444cf 100644 --- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java +++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java @@ -44,6 +44,7 @@ import io.trino.spi.type.Timestamps; import io.trino.spi.type.Type; import io.trino.spi.type.TypeSignatureParameter; +import io.trino.spi.type.UuidType; import io.trino.spi.type.VarcharType; import java.math.BigDecimal; import java.time.LocalDate; @@ -55,6 +56,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.UUID; import org.apache.pulsar.client.impl.schema.JSONSchema; import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord; import org.apache.pulsar.client.impl.schema.generic.GenericJsonSchema; @@ -98,6 +100,8 @@ public void testPrimitiveType() { LocalDate epoch = LocalDate.ofEpochDay(0); message.dateField = Math.toIntExact(ChronoUnit.DAYS.between(epoch, localDate)); + message.uuidField = UUID.randomUUID(); + ByteBuf payload = io.netty.buffer.Unpooled .copiedBuffer(schema.encode(message)); Map decodedRow = pulsarRowDecoder.decodeRow(payload).get(); @@ -137,6 +141,10 @@ public void testPrimitiveType() { PulsarColumnHandle timeFieldColumnHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(), "timeField", TIME_MILLIS, false, false, "timeField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE); checkValue(decodedRow, timeFieldColumnHandle, (long) message.timeField * Timestamps.PICOSECONDS_PER_MILLISECOND); + + PulsarColumnHandle uuidHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(), + "uuidField", UuidType.UUID, false, false, "uuidField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE); + checkValue(decodedRow, uuidHandle, message.uuidField.toString()); } @Test