Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update AvroWrapper to expose logical type readers and converters #19

Merged
merged 1 commit into from
May 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class AvroReader {
* <li>timestamp-micros</li>
* </ul>
*/
private static final Map<String, LogicalTypeReader> DEFAULT_LOGICAL_TYPE_READERS;
public static final Map<String, LogicalTypeReader> DEFAULT_LOGICAL_TYPE_READERS;

/**
* A {@link Map} of {@link LogicalTypeReader}s used to interpret Avro logical types.
Expand Down
20 changes: 10 additions & 10 deletions twister-avro/src/main/java/dev/twister/avro/AvroSchemaInferrer.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.temporal.ChronoUnit;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
* A utility class to infer Avro schema from Java objects.
Expand All @@ -30,26 +30,26 @@ public class AvroSchemaInferrer {
private final boolean mapAsRecord;

/**
* A ChronoUnit to determine the precision of time-based Avro logical types.
* A TimeUnit to determine the precision of time-based Avro logical types.
* It must be either MILLIS or MICROS.
*/
private final ChronoUnit timePrecision;
private final TimeUnit timePrecision;

/**
* Creates an AvroSchemaInferrer with the default behavior of treating maps as records.
*/
public AvroSchemaInferrer() {
this(true, ChronoUnit.MILLIS);
this(true, TimeUnit.MILLISECONDS);
}

/**
* Creates an AvroSchemaInferrer.
*
* @param mapAsRecord A flag to indicate whether maps should be treated as records.
* @param timePrecision A ChronoUnit to determine the precision of time-based Avro logical types.
* @param timePrecision A TimeUnit to determine the precision of time-based Avro logical types.
*/
public AvroSchemaInferrer(boolean mapAsRecord, ChronoUnit timePrecision) {
if (timePrecision != ChronoUnit.MILLIS && timePrecision != ChronoUnit.MICROS) {
public AvroSchemaInferrer(boolean mapAsRecord, TimeUnit timePrecision) {
if (timePrecision != TimeUnit.MILLISECONDS && timePrecision != TimeUnit.MICROSECONDS) {
throw new IllegalArgumentException("Unsupported time precision: " + timePrecision);
}
this.mapAsRecord = mapAsRecord;
Expand Down Expand Up @@ -125,19 +125,19 @@ private Schema getSchemaBasedOnObjectType(Object value, String fieldName, String
} else if (value instanceof LocalDate) {
schema = LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT));
} else if (value instanceof LocalTime) {
if (timePrecision == ChronoUnit.MILLIS) {
if (timePrecision == TimeUnit.MILLISECONDS) {
schema = LogicalTypes.timeMillis().addToSchema(Schema.create(Schema.Type.INT));
} else {
schema = LogicalTypes.timeMicros().addToSchema(Schema.create(Schema.Type.LONG));
}
} else if (value instanceof Instant) {
if (timePrecision == ChronoUnit.MILLIS) {
if (timePrecision == TimeUnit.MILLISECONDS) {
schema = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
} else {
schema = LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG));
}
} else if (value instanceof LocalDateTime) {
if (timePrecision == ChronoUnit.MILLIS) {
if (timePrecision == TimeUnit.MILLISECONDS) {
schema = LogicalTypes.localTimestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
} else {
schema = LogicalTypes.localTimestampMicros().addToSchema(Schema.create(Schema.Type.LONG));
Expand Down
66 changes: 66 additions & 0 deletions twister-avro/src/main/java/dev/twister/avro/AvroWrapper.java
Original file line number Diff line number Diff line change
@@ -1,24 +1,48 @@
package dev.twister.avro;

import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.IndexedRecord;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneOffset;
import java.util.AbstractList;
import java.util.AbstractMap;
import java.util.AbstractSet;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
* This class provides functionality to wrap Avro IndexedRecord objects
* into a Map representation for easier data manipulation.
*/
public class AvroWrapper {

private static final Map<String, LogicalTypeConverter> DEFAULT_LOGICAL_TYPE_CONVERTERS;

private final Map<String, LogicalTypeConverter> logicalTypeConverters;

public AvroWrapper() {
this(DEFAULT_LOGICAL_TYPE_CONVERTERS);
}

public AvroWrapper(Map<String, LogicalTypeConverter> logicalTypeConverters) {
this.logicalTypeConverters = logicalTypeConverters;
}

/**
* Wraps the given IndexedRecord into a Map.
*
Expand All @@ -39,6 +63,13 @@ public Map<String, Object> wrap(IndexedRecord record) {
* @return the coerced value, or throws IllegalArgumentException if the Avro type is unsupported
*/
private Object coerceType(Schema schema, Object value) {
if (schema.getLogicalType() != null) {
LogicalTypeConverter converter = logicalTypeConverters.get(schema.getLogicalType().getName());
if (converter != null) {
return converter.convert(value);
}
}

switch (schema.getType()) {
case RECORD:
return new Facade((IndexedRecord) value);
Expand Down Expand Up @@ -207,4 +238,39 @@ public int size() {
};
}
}

public interface LogicalTypeConverter {
Object convert(Object value);
}

static {
DEFAULT_LOGICAL_TYPE_CONVERTERS = Map.of(
"decimal", value -> {
GenericData.Fixed fixed = (GenericData.Fixed) value;
byte[] bytes = fixed.bytes();
int scale = ((LogicalTypes.Decimal) fixed.getSchema().getLogicalType()).getScale();
byte[] valueBytes = Arrays.copyOfRange(bytes, 0, bytes.length);
return new BigDecimal(new BigInteger(valueBytes), scale);
},
"uuid", value -> UUID.fromString((String) value),
"date", value -> LocalDate.ofEpochDay((int) value),
"time-millis", value -> LocalTime.ofNanoOfDay(TimeUnit.MILLISECONDS.toNanos((int) value)),
"time-micros", value -> LocalTime.ofNanoOfDay(TimeUnit.MICROSECONDS.toNanos((long) value)),
"timestamp-millis", value -> Instant.ofEpochMilli((long) value),
"timestamp-micros", value -> Instant.ofEpochSecond(0, (long) value * 1000),
"local-timestamp-millis", value -> {
long longValue = (long) value;
long seconds = longValue / 1000;
int nanos = (int) (longValue % 1000) * 1000000;
return LocalDateTime.ofEpochSecond(seconds, nanos, ZoneOffset.UTC);
},
"local-timestamp-micros", value -> {
long longValue = (long) value;
long seconds = longValue / 1000000;
int nanos = (int) (longValue % 1000000) * 1000;
return LocalDateTime.ofEpochSecond(seconds, nanos, ZoneOffset.UTC);
}
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
import org.apache.avro.SchemaBuilder;

import java.math.BigDecimal;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class AvroSchemaInferrerTest extends TestCase {
public void testSchemaInferrer() {
Expand Down Expand Up @@ -66,7 +66,7 @@ public void testSchemaWithMapAsMap() {
map.put("field3", 45.67);

// Create an AvroSchemaInferrer with mapAsRecord = false
AvroSchemaInferrer inferrer = new AvroSchemaInferrer(false, ChronoUnit.MILLIS);
AvroSchemaInferrer inferrer = new AvroSchemaInferrer(false, TimeUnit.MILLISECONDS);

// Infer the Avro schema for the map
Schema schema = inferrer.infer(map, "TestRecord");
Expand Down
133 changes: 133 additions & 0 deletions twister-avro/src/test/java/dev/twister/avro/AvroWrapperTest.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
package dev.twister.avro;

import junit.framework.TestCase;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;

import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.time.*;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.*;
import java.util.concurrent.TimeUnit;

public class AvroWrapperTest extends TestCase {
public void testWrapPrimitives() {
Expand Down Expand Up @@ -194,4 +201,130 @@ public void testListOfRecords() {
Map<String, Object> subRecord2Map = (Map<String, Object>) list.get(1);
assertEquals(2, subRecord2Map.get("subField"));
}

public void testWrapLogicalDate() {
Schema dateSchema = LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT));
Schema recordSchema = Schema.createRecord("TestRecord", "", "", false);
recordSchema.setFields(Collections.singletonList(new Schema.Field("testDate", dateSchema, "", null)));

GenericRecord record = new GenericData.Record(recordSchema);
LocalDate testDate = LocalDate.now();
record.put("testDate", (int) testDate.toEpochDay());

AvroWrapper wrapper = new AvroWrapper();
Map<String, Object> result = wrapper.wrap(record);

assertEquals(testDate, result.get("testDate"));
}

public void testWrapLogicalTimestampMillis() {
Schema timestampMillisSchema = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
Schema recordSchema = Schema.createRecord("TestRecord", "", "", false);
recordSchema.setFields(Collections.singletonList(new Schema.Field("testTimestampMillis", timestampMillisSchema, "", null)));

GenericRecord record = new GenericData.Record(recordSchema);
Instant now = Instant.now();
record.put("testTimestampMillis", now.toEpochMilli());

AvroWrapper wrapper = new AvroWrapper();
Map<String, Object> result = wrapper.wrap(record);

assertEquals(now.truncatedTo(ChronoUnit.MILLIS), result.get("testTimestampMillis"));
}

public void testWrapLogicalTimestampMicros() {
Schema timestampMicrosSchema = LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG));
Schema recordSchema = Schema.createRecord("TestRecord", "", "", false);
recordSchema.setFields(Collections.singletonList(new Schema.Field("testTimestampMicros", timestampMicrosSchema, "", null)));

GenericRecord record = new GenericData.Record(recordSchema);
Instant now = Instant.now();
long epochMicros = TimeUnit.SECONDS.toMicros(now.getEpochSecond()) + now.getNano() / 1000;
record.put("testTimestampMicros", epochMicros);

AvroWrapper wrapper = new AvroWrapper();
Map<String, Object> result = wrapper.wrap(record);

assertEquals(now.truncatedTo(ChronoUnit.MICROS), result.get("testTimestampMicros"));
}

public void testWrapLogicalLocalTimestampMillis() {
Schema localTimestampMillisSchema = LogicalTypes.localTimestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
Schema recordSchema = Schema.createRecord("TestRecord", "", "", false);
recordSchema.setFields(Collections.singletonList(new Schema.Field("testLocalTimestampMillis", localTimestampMillisSchema, "", null)));

GenericRecord record = new GenericData.Record(recordSchema);
LocalDateTime now = LocalDateTime.now();
long epochMillis = now.atZone(ZoneOffset.UTC).toInstant().toEpochMilli();
record.put("testLocalTimestampMillis", epochMillis);

AvroWrapper wrapper = new AvroWrapper();
Map<String, Object> result = wrapper.wrap(record);

assertEquals(now.truncatedTo(ChronoUnit.MILLIS), result.get("testLocalTimestampMillis"));
}

public void testWrapLogicalLocalTimestampMicros() {
Schema localTimestampMicrosSchema = LogicalTypes.localTimestampMicros().addToSchema(Schema.create(Schema.Type.LONG));
Schema recordSchema = Schema.createRecord("TestRecord", "", "", false);
recordSchema.setFields(Collections.singletonList(new Schema.Field("testLocalTimestampMicros", localTimestampMicrosSchema, "", null)));

GenericRecord record = new GenericData.Record(recordSchema);
LocalDateTime now = LocalDateTime.now();
long epochMicros = TimeUnit.SECONDS.toMicros(now.atZone(ZoneOffset.UTC).toEpochSecond()) + now.getNano() / 1000;
record.put("testLocalTimestampMicros", epochMicros);

AvroWrapper wrapper = new AvroWrapper();
Map<String, Object> result = wrapper.wrap(record);

assertEquals(now.truncatedTo(ChronoUnit.MICROS), result.get("testLocalTimestampMicros"));
}

public void testWrapLogicalTimeMillis() {
Schema timeMillisSchema = LogicalTypes.timeMillis().addToSchema(Schema.create(Schema.Type.INT));
Schema recordSchema = Schema.createRecord("TestRecord", "", "", false);
recordSchema.setFields(Collections.singletonList(new Schema.Field("testTimeMillis", timeMillisSchema, "", null)));

GenericRecord record = new GenericData.Record(recordSchema);
LocalTime now = LocalTime.now();
int millisOfDay = (int) (TimeUnit.SECONDS.toMillis(now.toSecondOfDay()) + (now.getNano() / 1_000_000));
record.put("testTimeMillis", millisOfDay);

AvroWrapper wrapper = new AvroWrapper();
Map<String, Object> result = wrapper.wrap(record);

assertEquals(now.truncatedTo(ChronoUnit.MILLIS), result.get("testTimeMillis"));
}

public void testWrapLogicalTimeMicros() {
Schema timeMicrosSchema = LogicalTypes.timeMicros().addToSchema(Schema.create(Schema.Type.LONG));
Schema recordSchema = Schema.createRecord("TestRecord", "", "", false);
recordSchema.setFields(Collections.singletonList(new Schema.Field("testTimeMicros", timeMicrosSchema, "", null)));

GenericRecord record = new GenericData.Record(recordSchema);
LocalTime now = LocalTime.now();
long microsOfDay = TimeUnit.SECONDS.toMicros(now.toSecondOfDay()) + now.getNano() / 1000;
record.put("testTimeMicros", microsOfDay);

AvroWrapper wrapper = new AvroWrapper();
Map<String, Object> result = wrapper.wrap(record);

assertEquals(now.truncatedTo(ChronoUnit.MICROS), result.get("testTimeMicros"));
}

public void testWrapLogicalDecimal() {
LogicalTypes.Decimal decimalLogicalType = LogicalTypes.decimal(9, 2);
Schema decimalSchema = decimalLogicalType.addToSchema(Schema.createFixed("TestDecimal", "", "", 5));
Schema recordSchema = Schema.createRecord("TestRecord", "", "", false);
recordSchema.setFields(Collections.singletonList(new Schema.Field("testDecimal", decimalSchema, "", null)));

GenericRecord record = new GenericData.Record(recordSchema);
BigDecimal decimalValue = new BigDecimal("12345.67");
record.put("testDecimal", new Conversions.DecimalConversion().toFixed(decimalValue, decimalSchema, decimalLogicalType));

AvroWrapper wrapper = new AvroWrapper();
Map<String, Object> result = wrapper.wrap(record);

assertEquals(decimalValue, result.get("testDecimal"));
}
}