Skip to content

Commit

Permalink
Return new ReflectData for both reflect datum reader and writer (#28280)
Browse files Browse the repository at this point in the history
  • Loading branch information
RustedBones authored Sep 1, 2023
1 parent 3ff66d3 commit 325efce
Showing 1 changed file with 10 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.avro.specific.SpecificDatumReader;
Expand All @@ -33,6 +34,9 @@
import org.checkerframework.checker.nullness.qual.Nullable;

/** Create {@link DatumReader} and {@link DatumWriter} for given schemas. */
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public abstract class AvroDatumFactory<T>
implements AvroSource.DatumReaderFactory<T>, AvroSink.DatumWriterFactory<T> {

Expand Down Expand Up @@ -168,25 +172,16 @@ public ReflectDatumFactory(Class<T> type) {

@Override
public DatumReader<T> apply(Schema writer, Schema reader) {
// create the datum writer using the Class<T> api.
// avro will load the proper class loader
ReflectDatumReader<T> datumReader = new ReflectDatumReader<>(type);
datumReader.setExpected(reader);
datumReader.setSchema(writer);
// for backward compat, add logical type support by default
AvroUtils.addLogicalTypeConversions(datumReader.getData());
return datumReader;
ReflectData data = new ReflectData(type.getClassLoader());
AvroUtils.addLogicalTypeConversions(data);
return new ReflectDatumReader<>(writer, reader, data);
}

@Override
public DatumWriter<T> apply(Schema writer) {
// create the datum writer using the Class<T> api.
// avro will load the proper class loader
ReflectDatumWriter<T> datumWriter = new ReflectDatumWriter<>(type);
datumWriter.setSchema(writer);
// for backward compat, add logical type support by default
AvroUtils.addLogicalTypeConversions(datumWriter.getData());
return datumWriter;
ReflectData data = new ReflectData(type.getClassLoader());
AvroUtils.addLogicalTypeConversions(data);
return new ReflectDatumWriter<>(writer, data);
}

public static <T> ReflectDatumFactory<T> of(Class<T> type) {
Expand Down

0 comments on commit 325efce

Please sign in to comment.