Skip to content

Commit

Permalink
[feat][sql] Support UUID for json and avro (#21267)
Browse files Browse the repository at this point in the history
### Motivation
As https://pulsar.apache.org/docs/3.1.x/sql-overview/, Pulsar SQL is based on [Trino (formerly Presto SQL)](https://trino.io/), which supports UUID type. But now, the UUID field in Avro or JSON schema will be interpreted as VARCHAR.

### Modifications

Support decoding UUID form AVRO or JSON schema.
  • Loading branch information
liangyepianzhou authored Oct 9, 2023
1 parent ca77982 commit 8c70943
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,15 @@
import io.trino.spi.type.Timestamps;
import io.trino.spi.type.TinyintType;
import io.trino.spi.type.Type;
import io.trino.spi.type.UuidType;
import io.trino.spi.type.VarbinaryType;
import io.trino.spi.type.VarcharType;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.avro.generic.GenericEnumSymbol;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
Expand All @@ -87,7 +89,8 @@ public class PulsarAvroColumnDecoder {
TimestampType.TIMESTAMP_MILLIS,
DateType.DATE,
TimeType.TIME_MILLIS,
VarbinaryType.VARBINARY);
VarbinaryType.VARBINARY,
UuidType.UUID);

private final Type columnType;
private final String columnMapping;
Expand Down Expand Up @@ -255,6 +258,10 @@ private static Slice getSlice(Object value, Type type, String columnName) {
}
}

if (type instanceof UuidType) {
return UuidType.javaUuidToTrinoUuid(UUID.fromString(value.toString()));
}

throw new TrinoException(DECODER_CONVERSION_NOT_SUPPORTED,
format("cannot decode object of '%s' as '%s' for column '%s'",
value.getClass(), type, columnName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import io.trino.spi.type.TypeManager;
import io.trino.spi.type.TypeSignature;
import io.trino.spi.type.TypeSignatureParameter;
import io.trino.spi.type.UuidType;
import io.trino.spi.type.VarbinaryType;
import io.trino.spi.type.VarcharType;
import java.util.List;
Expand Down Expand Up @@ -121,6 +122,10 @@ private Type parseAvroPrestoType(String fieldName, Schema schema) {
LogicalType logicalType = schema.getLogicalType();
switch (type) {
case STRING:
if (logicalType != null && logicalType.equals(LogicalTypes.uuid())) {
return UuidType.UUID;
}
return createUnboundedVarcharType();
case ENUM:
return createUnboundedVarcharType();
case NULL:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import io.trino.spi.type.Timestamps;
import io.trino.spi.type.TinyintType;
import io.trino.spi.type.Type;
import io.trino.spi.type.UuidType;
import io.trino.spi.type.VarbinaryType;
import io.trino.spi.type.VarcharType;
import java.util.Iterator;
Expand Down Expand Up @@ -126,7 +127,8 @@ private boolean isSupportedType(Type type) {
TimestampType.TIMESTAMP_MILLIS,
DateType.DATE,
TimeType.TIME_MILLIS,
RealType.REAL
RealType.REAL,
UuidType.UUID
).contains(type)) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import io.trino.spi.type.TypeManager;
import io.trino.spi.type.TypeSignature;
import io.trino.spi.type.TypeSignatureParameter;
import io.trino.spi.type.UuidType;
import io.trino.spi.type.VarbinaryType;
import io.trino.spi.type.VarcharType;
import java.util.List;
Expand Down Expand Up @@ -121,6 +122,10 @@ private Type parseJsonPrestoType(String fieldName, Schema schema) {
LogicalType logicalType = schema.getLogicalType();
switch (type) {
case STRING:
if (logicalType != null && logicalType.equals(LogicalTypes.uuid())) {
return UuidType.UUID;
}
return createUnboundedVarcharType();
case ENUM:
return createUnboundedVarcharType();
case NULL:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.sql.presto.decoder;

import java.math.BigDecimal;
import java.util.UUID;
import lombok.Data;

import java.util.List;
Expand Down Expand Up @@ -55,6 +56,9 @@ public static enum TestEnum {
public Map<String, Long> mapField;
public CompositeRow compositeRow;

@org.apache.avro.reflect.AvroSchema("{\"type\":\"string\",\"logicalType\":\"uuid\"}")
public UUID uuidField;

public static class TestRow {
public String stringField;
public int intField;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import io.trino.spi.type.Timestamps;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeSignatureParameter;
import io.trino.spi.type.UuidType;
import io.trino.spi.type.VarcharType;
import java.math.BigDecimal;
import java.time.LocalDate;
Expand All @@ -55,6 +56,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
Expand Down Expand Up @@ -90,6 +92,7 @@ public void testPrimitiveType() {
message.longField = 222L;
message.timestampField = System.currentTimeMillis();
message.enumField = DecoderTestMessage.TestEnum.TEST_ENUM_1;
message.uuidField = UUID.randomUUID();

LocalTime now = LocalTime.now(ZoneId.systemDefault());
message.timeField = now.toSecondOfDay() * 1000;
Expand Down Expand Up @@ -137,6 +140,10 @@ public void testPrimitiveType() {
PulsarColumnHandle timeFieldColumnHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(),
"timeField", TIME_MILLIS, false, false, "timeField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);
checkValue(decodedRow, timeFieldColumnHandle, (long) message.timeField * Timestamps.PICOSECONDS_PER_MILLISECOND);

PulsarColumnHandle uuidHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(),
"uuidField", UuidType.UUID, false, false, "uuidField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);
checkValue(decodedRow, uuidHandle, UuidType.javaUuidToTrinoUuid(message.uuidField));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import io.trino.spi.type.Timestamps;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeSignatureParameter;
import io.trino.spi.type.UuidType;
import io.trino.spi.type.VarcharType;
import java.math.BigDecimal;
import java.time.LocalDate;
Expand All @@ -55,6 +56,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
import org.apache.pulsar.client.impl.schema.generic.GenericJsonSchema;
Expand Down Expand Up @@ -98,6 +100,8 @@ public void testPrimitiveType() {
LocalDate epoch = LocalDate.ofEpochDay(0);
message.dateField = Math.toIntExact(ChronoUnit.DAYS.between(epoch, localDate));

message.uuidField = UUID.randomUUID();

ByteBuf payload = io.netty.buffer.Unpooled
.copiedBuffer(schema.encode(message));
Map<DecoderColumnHandle, FieldValueProvider> decodedRow = pulsarRowDecoder.decodeRow(payload).get();
Expand Down Expand Up @@ -137,6 +141,10 @@ public void testPrimitiveType() {
PulsarColumnHandle timeFieldColumnHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(),
"timeField", TIME_MILLIS, false, false, "timeField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);
checkValue(decodedRow, timeFieldColumnHandle, (long) message.timeField * Timestamps.PICOSECONDS_PER_MILLISECOND);

PulsarColumnHandle uuidHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(),
"uuidField", UuidType.UUID, false, false, "uuidField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);
checkValue(decodedRow, uuidHandle, message.uuidField.toString());
}

@Test
Expand Down

0 comments on commit 8c70943

Please sign in to comment.