Skip to content

Commit

Permalink
Added a flag in FastDatumReader/FastDatumWriter to indicate whether f…
Browse files Browse the repository at this point in the history
…ast class is being used or not (#194)
  • Loading branch information
gaojieliu authored Sep 10, 2021
1 parent 6c74e98 commit 88b831d
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ public T read(T reuse, Decoder in) throws IOException {
fastDeserializer = cachedFastDeserializer;
} else {
fastDeserializer = getFastDeserializerFromCache(cache, writerSchema, readerSchema);
if (fastDeserializer instanceof FastSerdeCache.FastDeserializerWithAvroSpecificImpl
|| fastDeserializer instanceof FastSerdeCache.FastDeserializerWithAvroGenericImpl) {
if (!isFastDeserializer(fastDeserializer)) {
// don't cache
} else {
cachedFastDeserializer = fastDeserializer;
Expand All @@ -98,4 +97,20 @@ protected FastDeserializer<T> getFastDeserializerFromCache(FastSerdeCache fastSe
protected FastDeserializer<T> getRegularAvroImpl(Schema writerSchema, Schema readerSchema) {
return new FastSerdeCache.FastDeserializerWithAvroGenericImpl<>(writerSchema, readerSchema);
}

private static boolean isFastDeserializer(FastDeserializer deserializer) {
return !(deserializer instanceof FastSerdeCache.FastDeserializerWithAvroSpecificImpl
|| deserializer instanceof FastSerdeCache.FastDeserializerWithAvroGenericImpl);
}

/**
* Return a flag to indicate whether fast deserializer is being used or not.
* @return
*/
public boolean isFastDeserializerUsed() {
if (cachedFastDeserializer == null) {
return false;
}
return isFastDeserializer(cachedFastDeserializer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ public void write(T data, Encoder out) throws IOException {
fastSerializer = cachedFastSerializer;
} else {
fastSerializer = getFastSerializerFromCache(cache, writerSchema);
if (fastSerializer instanceof FastSerdeCache.FastSerializerWithAvroSpecificImpl
|| fastSerializer instanceof FastSerdeCache.FastSerializerWithAvroGenericImpl) {
if (!isFastSerializer(fastSerializer)) {
// don't cache
} else {
cachedFastSerializer = fastSerializer;
Expand All @@ -76,4 +75,20 @@ protected FastSerializer<T> getFastSerializerFromCache(FastSerdeCache fastSerdeC
protected FastSerializer<T> getRegularAvroImpl(Schema schema) {
return new FastSerdeCache.FastSerializerWithAvroGenericImpl<>(schema);
}

private static boolean isFastSerializer(FastSerializer serializer) {
return !(serializer instanceof FastSerdeCache.FastSerializerWithAvroSpecificImpl
|| serializer instanceof FastSerdeCache.FastSerializerWithAvroGenericImpl);
}

/**
* Return a flag to indicate whether fast serializer is being used or not.
* @return
*/
public boolean isFastSerializerUsed() {
if (cachedFastSerializer == null) {
return false;
}
return isFastSerializer(cachedFastSerializer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,16 @@ public void shouldCreateGenericDatumReader() throws IOException, InterruptedExce
Schema recordSchema = createRecord("TestSchema", createPrimitiveUnionFieldSchema("test", Schema.Type.STRING));
FastGenericDatumReader<GenericRecord> fastGenericDatumReader = new FastGenericDatumReader<>(recordSchema, cache);

Assert.assertFalse(fastGenericDatumReader.isFastDeserializerUsed(), "FastGenericDatumReader"
+ " shouldn't use the fast deserializer when firstly created");

GenericRecord record = new GenericData.Record(recordSchema);
record.put("test", "test");

// when
fastGenericDatumReader.read(null, FastSerdeTestsSupport.genericDataAsDecoder(record));
Assert.assertFalse(fastGenericDatumReader.isFastDeserializerUsed(), "FastGenericDatumReader shouldn't"
+ " use the fast deserializer during fast class generation");

// then
FastDeserializer<GenericRecord> fastGenericDeserializer =
Expand All @@ -96,5 +101,13 @@ public void shouldCreateGenericDatumReader() throws IOException, InterruptedExce
Assert.assertNotEquals(2, fastGenericDeserializer.getClass().getDeclaredMethods().length);
Assert.assertEquals(new Utf8("test"),
fastGenericDatumReader.read(null, FastSerdeTestsSupport.genericDataAsDecoder(record)).get("test"));

// Block fast class generation
cache.buildFastGenericDeserializer(recordSchema, recordSchema);
// Run the de-serialization again
Assert.assertEquals(new Utf8("test"),
fastGenericDatumReader.read(null, FastSerdeTestsSupport.genericDataAsDecoder(record)).get("test"));
Assert.assertTrue(fastGenericDatumReader.isFastDeserializerUsed(), "FastGenericDatumReader should be using"
+ " Fast Deserializer when the fast deserializer generation is done.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.testng.Assert;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -52,13 +53,18 @@ public void shouldCreateSpecificDatumWriter() throws IOException, InterruptedExc
@SuppressWarnings("unchecked")
public void shouldCreateGenericDatumReader() throws IOException, InterruptedException {
Schema recordSchema = createRecord("TestSchema", createPrimitiveUnionFieldSchema("test", Schema.Type.STRING));
FastGenericDatumWriter<GenericRecord> fastGenericDatumReader = new FastGenericDatumWriter<>(recordSchema, cache);
FastGenericDatumWriter<GenericRecord> fastGenericDatumWriter = new FastGenericDatumWriter<>(recordSchema, cache);

Assert.assertFalse(fastGenericDatumWriter.isFastSerializerUsed(), "FastGenericDatumWriter"
+ " shouldn't use the fast serializer when firstly created");

GenericRecord record = new GenericData.Record(recordSchema);
record.put("test", "test");

// when
fastGenericDatumReader.write(record, AvroCompatibilityHelper.newBinaryEncoder(new ByteArrayOutputStream(), true, null));
fastGenericDatumWriter.write(record, AvroCompatibilityHelper.newBinaryEncoder(new ByteArrayOutputStream(), true, null));
Assert.assertFalse(fastGenericDatumWriter.isFastSerializerUsed(), "FastGenericDatumWriter shouldn't"
+ " use the fast serializer during fast class generation");

// then
FastSerializer<GenericRecord> fastGenericSerializer =
Expand All @@ -68,5 +74,11 @@ public void shouldCreateGenericDatumReader() throws IOException, InterruptedExce

Assert.assertNotNull(fastGenericSerializer);
Assert.assertNotEquals(2, fastGenericSerializer.getClass().getDeclaredMethods().length);

// Block fast class generation
cache.buildFastGenericSerializer(recordSchema);
fastGenericDatumWriter.write(record, AvroCompatibilityHelper.newBinaryEncoder(new ByteArrayOutputStream(), true, null));
Assert.assertTrue(fastGenericDatumWriter.isFastSerializerUsed(), "FastGenericDatumWriter should be using"
+ " Fast Serializer when the fast deserializer generation is done.");
}
}

0 comments on commit 88b831d

Please sign in to comment.