From 88b831d314c03386234b8688521af8b86ab5c86b Mon Sep 17 00:00:00 2001 From: gaojieliu Date: Fri, 10 Sep 2021 10:52:43 -0700 Subject: [PATCH] Added a flag in FastDatumReader/FastDatumWriter to indicate whether fast class is being used or not (#194) --- .../fastserde/FastGenericDatumReader.java | 19 +++++++++++++++++-- .../fastserde/FastGenericDatumWriter.java | 19 +++++++++++++++++-- .../avro/fastserde/FastDatumReaderTest.java | 13 +++++++++++++ .../avro/fastserde/FastDatumWriterTest.java | 16 ++++++++++++++-- 4 files changed, 61 insertions(+), 6 deletions(-) diff --git a/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastGenericDatumReader.java b/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastGenericDatumReader.java index 3c440259a..6d8393525 100644 --- a/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastGenericDatumReader.java +++ b/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastGenericDatumReader.java @@ -75,8 +75,7 @@ public T read(T reuse, Decoder in) throws IOException { fastDeserializer = cachedFastDeserializer; } else { fastDeserializer = getFastDeserializerFromCache(cache, writerSchema, readerSchema); - if (fastDeserializer instanceof FastSerdeCache.FastDeserializerWithAvroSpecificImpl - || fastDeserializer instanceof FastSerdeCache.FastDeserializerWithAvroGenericImpl) { + if (!isFastDeserializer(fastDeserializer)) { // don't cache } else { cachedFastDeserializer = fastDeserializer; @@ -98,4 +97,20 @@ protected FastDeserializer getFastDeserializerFromCache(FastSerdeCache fastSe protected FastDeserializer getRegularAvroImpl(Schema writerSchema, Schema readerSchema) { return new FastSerdeCache.FastDeserializerWithAvroGenericImpl<>(writerSchema, readerSchema); } + + private static boolean isFastDeserializer(FastDeserializer deserializer) { + return !(deserializer instanceof FastSerdeCache.FastDeserializerWithAvroSpecificImpl + || deserializer instanceof FastSerdeCache.FastDeserializerWithAvroGenericImpl); + } + + /** + * Return a flag to indicate whether fast deserializer is being used or not. + * @return + */ + public boolean isFastDeserializerUsed() { + if (cachedFastDeserializer == null) { + return false; + } + return isFastDeserializer(cachedFastDeserializer); + } } diff --git a/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastGenericDatumWriter.java b/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastGenericDatumWriter.java index 25f495bd6..1aa17117d 100644 --- a/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastGenericDatumWriter.java +++ b/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastGenericDatumWriter.java @@ -55,8 +55,7 @@ public void write(T data, Encoder out) throws IOException { fastSerializer = cachedFastSerializer; } else { fastSerializer = getFastSerializerFromCache(cache, writerSchema); - if (fastSerializer instanceof FastSerdeCache.FastSerializerWithAvroSpecificImpl - || fastSerializer instanceof FastSerdeCache.FastSerializerWithAvroGenericImpl) { + if (!isFastSerializer(fastSerializer)) { // don't cache } else { cachedFastSerializer = fastSerializer; @@ -76,4 +75,20 @@ protected FastSerializer getFastSerializerFromCache(FastSerdeCache fastSerdeC protected FastSerializer getRegularAvroImpl(Schema schema) { return new FastSerdeCache.FastSerializerWithAvroGenericImpl<>(schema); } + + private static boolean isFastSerializer(FastSerializer serializer) { + return !(serializer instanceof FastSerdeCache.FastSerializerWithAvroSpecificImpl + || serializer instanceof FastSerdeCache.FastSerializerWithAvroGenericImpl); + } + + /** + * Return a flag to indicate whether fast serializer is being used or not. + * @return + */ + public boolean isFastSerializerUsed() { + if (cachedFastSerializer == null) { + return false; + } + return isFastSerializer(cachedFastSerializer); + } } diff --git a/avro-fastserde/src/test/java/com/linkedin/avro/fastserde/FastDatumReaderTest.java b/avro-fastserde/src/test/java/com/linkedin/avro/fastserde/FastDatumReaderTest.java index 8f4325e2e..a7c66f91e 100644 --- a/avro-fastserde/src/test/java/com/linkedin/avro/fastserde/FastDatumReaderTest.java +++ b/avro-fastserde/src/test/java/com/linkedin/avro/fastserde/FastDatumReaderTest.java @@ -79,11 +79,16 @@ public void shouldCreateGenericDatumReader() throws IOException, InterruptedExce Schema recordSchema = createRecord("TestSchema", createPrimitiveUnionFieldSchema("test", Schema.Type.STRING)); FastGenericDatumReader fastGenericDatumReader = new FastGenericDatumReader<>(recordSchema, cache); + Assert.assertFalse(fastGenericDatumReader.isFastDeserializerUsed(), "FastGenericDatumReader" + + " shouldn't use the fast deserializer when firstly created"); + GenericRecord record = new GenericData.Record(recordSchema); record.put("test", "test"); // when fastGenericDatumReader.read(null, FastSerdeTestsSupport.genericDataAsDecoder(record)); + Assert.assertFalse(fastGenericDatumReader.isFastDeserializerUsed(), "FastGenericDatumReader shouldn't" + + " use the fast deserializer during fast class generation"); // then FastDeserializer fastGenericDeserializer = @@ -96,5 +101,13 @@ public void shouldCreateGenericDatumReader() throws IOException, InterruptedExce Assert.assertNotEquals(2, fastGenericDeserializer.getClass().getDeclaredMethods().length); Assert.assertEquals(new Utf8("test"), fastGenericDatumReader.read(null, FastSerdeTestsSupport.genericDataAsDecoder(record)).get("test")); + + // Block fast class generation + cache.buildFastGenericDeserializer(recordSchema, recordSchema); + // Run the de-serialization again + Assert.assertEquals(new Utf8("test"), + fastGenericDatumReader.read(null, FastSerdeTestsSupport.genericDataAsDecoder(record)).get("test")); + Assert.assertTrue(fastGenericDatumReader.isFastDeserializerUsed(), "FastGenericDatumReader should be using" + + " Fast Deserializer when the fast deserializer generation is done."); } } diff --git a/avro-fastserde/src/test/java/com/linkedin/avro/fastserde/FastDatumWriterTest.java b/avro-fastserde/src/test/java/com/linkedin/avro/fastserde/FastDatumWriterTest.java index 895f55f2f..678499013 100644 --- a/avro-fastserde/src/test/java/com/linkedin/avro/fastserde/FastDatumWriterTest.java +++ b/avro-fastserde/src/test/java/com/linkedin/avro/fastserde/FastDatumWriterTest.java @@ -8,6 +8,7 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.util.Utf8; import org.testng.Assert; import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; @@ -52,13 +53,18 @@ public void shouldCreateSpecificDatumWriter() throws IOException, InterruptedExc @SuppressWarnings("unchecked") public void shouldCreateGenericDatumReader() throws IOException, InterruptedException { Schema recordSchema = createRecord("TestSchema", createPrimitiveUnionFieldSchema("test", Schema.Type.STRING)); - FastGenericDatumWriter fastGenericDatumReader = new FastGenericDatumWriter<>(recordSchema, cache); + FastGenericDatumWriter fastGenericDatumWriter = new FastGenericDatumWriter<>(recordSchema, cache); + + Assert.assertFalse(fastGenericDatumWriter.isFastSerializerUsed(), "FastGenericDatumWriter" + + " shouldn't use the fast serializer when firstly created"); GenericRecord record = new GenericData.Record(recordSchema); record.put("test", "test"); // when - fastGenericDatumReader.write(record, AvroCompatibilityHelper.newBinaryEncoder(new ByteArrayOutputStream(), true, null)); + fastGenericDatumWriter.write(record, AvroCompatibilityHelper.newBinaryEncoder(new ByteArrayOutputStream(), true, null)); + Assert.assertFalse(fastGenericDatumWriter.isFastSerializerUsed(), "FastGenericDatumWriter shouldn't" + + " use the fast serializer during fast class generation"); // then FastSerializer fastGenericSerializer = @@ -68,5 +74,11 @@ public void shouldCreateGenericDatumReader() throws IOException, InterruptedExce Assert.assertNotNull(fastGenericSerializer); Assert.assertNotEquals(2, fastGenericSerializer.getClass().getDeclaredMethods().length); + + // Block fast class generation + cache.buildFastGenericSerializer(recordSchema); + fastGenericDatumWriter.write(record, AvroCompatibilityHelper.newBinaryEncoder(new ByteArrayOutputStream(), true, null)); + Assert.assertTrue(fastGenericDatumWriter.isFastSerializerUsed(), "FastGenericDatumWriter should be using" + + " Fast Serializer when the fast deserializer generation is done."); } }