diff --git a/fastserde/avro-fastserde-tests-common/src/test/avro/fastserdeSimpleRecord.avsc b/fastserde/avro-fastserde-tests-common/src/test/avro/fastserdeSimpleRecord.avsc new file mode 100644 index 000000000..63dbf3cf7 --- /dev/null +++ b/fastserde/avro-fastserde-tests-common/src/test/avro/fastserdeSimpleRecord.avsc @@ -0,0 +1,12 @@ +{ + "type": "record", + "name": "SimpleRecord", + "namespace": "com.linkedin.avro.fastserde.generated.avro", + "fields": [ + { + "name": "text", + "type": "string", + "default": "In vino veritas" + } + ] +} diff --git a/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/FastSerdeCacheTest.java b/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/FastSerdeCacheTest.java index 3bd86b12f..6a72890c6 100644 --- a/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/FastSerdeCacheTest.java +++ b/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/FastSerdeCacheTest.java @@ -1,11 +1,23 @@ package com.linkedin.avro.fastserde; +import com.linkedin.avro.fastserde.generated.avro.SimpleRecord; import com.linkedin.avro.fastserde.generated.avro.TestRecord; +import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper; + +import java.io.ByteArrayOutputStream; +import java.lang.reflect.Field; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.function.BiFunction; + import org.apache.avro.Schema; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.specific.SpecificDatumWriter; import org.testng.Assert; import org.testng.annotations.Test; @@ -20,7 +32,7 @@ public void testIsSupportedForFastDeserializer() { supportedSchemaTypes.add(Schema.Type.ARRAY); Map schemaTypes = new HashMap<>(); - /** + /* * Those types could be created by {@link Schema#create(org.apache.avro.Schema.Type)} function. */ schemaTypes.put(Schema.Type.RECORD, Schema.parse("{\"type\": \"record\", \"name\": \"test\", \"fields\":[]}")); @@ -68,4 +80,81 @@ public void testBuildFastSpecificDeserializerWithCorrectClasspath() { FastSerdeCache cache = FastSerdeCache.getDefaultInstance(); cache.buildFastSpecificDeserializer(TestRecord.SCHEMA$, TestRecord.SCHEMA$); } + + @Test(groups = "serializationTest", timeOut = 5_000L, + expectedExceptions = UnsupportedOperationException.class, + expectedExceptionsMessageRegExp = "Fast specific serializer could not be generated.") + public void testSpecificSerializationFailsFast() throws Exception { + serializationShouldFailFast(FastSpecificDatumWriter::new); + } + + @Test(groups = "serializationTest", timeOut = 5_000L, + expectedExceptions = UnsupportedOperationException.class, + expectedExceptionsMessageRegExp = "Fast generic serializer could not be generated.") + public void testGenericSerializationFailsFast() throws Exception { + serializationShouldFailFast(FastGenericDatumWriter::new); + } + + @Test(groups = "deserializationTest", timeOut = 5_000L, + expectedExceptions = UnsupportedOperationException.class, + expectedExceptionsMessageRegExp = "Fast specific deserializer could not be generated.") + public void testSpecificDeserializationFailsFast() throws Exception { + deserializationShouldFailFast(FastSpecificDatumReader::new); + } + + @Test(groups = "deserializationTest", timeOut = 5_000L, + expectedExceptions = UnsupportedOperationException.class, + expectedExceptionsMessageRegExp = "Fast generic deserializer could not be generated.") + public void testGenericDeserializationFailsFast() throws Exception { + deserializationShouldFailFast(FastGenericDatumReader::new); + } + + private void serializationShouldFailFast( + BiFunction> datumWriterFactory) throws Exception { + // given: + SimpleRecord data = new SimpleRecord(); + data.put(0, "Veni, vidi, vici."); + FastSerdeCache cache = createCacheWithoutClassLoader(); + DatumWriter writer = datumWriterFactory.apply(data.getSchema(), cache); + + int i = 0; + while (++i <= 100) { + BinaryEncoder encoder = AvroCompatibilityHelper.newBinaryEncoder(new ByteArrayOutputStream()); + // should throw exception (except 1st iteration when fallback writer is always used) + writer.write(data, encoder); + Thread.sleep(50L); + } + } + + private void deserializationShouldFailFast( + BiFunction> datumReaderFactory) throws Exception { + // given + SimpleRecord data = new SimpleRecord(); + data.put(0, "Omnes una manet nox."); + FastSerdeCache cache = createCacheWithoutClassLoader(); + + SpecificDatumWriter specificDatumWriter = new SpecificDatumWriter<>(data.getSchema()); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + BinaryEncoder encoder = AvroCompatibilityHelper.newBinaryEncoder(baos); + specificDatumWriter.write(data, encoder); + encoder.flush(); + + DatumReader datumReader = datumReaderFactory.apply(data.getSchema(), cache); + + int i = 0; + while (++i <= 100) { + BinaryDecoder decoder = AvroCompatibilityHelper.newBinaryDecoder(baos.toByteArray()); + // should throw exception (except 1st iteration when fallback reader is always used) + datumReader.read(null, decoder); + Thread.sleep(50L); + } + } + + private FastSerdeCache createCacheWithoutClassLoader() throws IllegalAccessException, NoSuchFieldException { + FastSerdeCache cache = new FastSerdeCache(null, null, true); + Field classLoaderField = cache.getClass().getDeclaredField("classLoader"); + classLoaderField.setAccessible(true); + classLoaderField.set(cache, null); // so that an exception is thrown while compiling generated class + return cache; + } } diff --git a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastDeserializerGenerator.java b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastDeserializerGenerator.java index 85c2ca3c9..03fed9a8f 100644 --- a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastDeserializerGenerator.java +++ b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastDeserializerGenerator.java @@ -52,7 +52,6 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.avro.io.Decoder; -import org.apache.avro.specific.SpecificData; import org.apache.avro.util.Utf8; import org.apache.commons.lang3.StringUtils; diff --git a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastGenericDatumReader.java b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastGenericDatumReader.java index 05e9666ad..09b9f6ec3 100644 --- a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastGenericDatumReader.java +++ b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastGenericDatumReader.java @@ -87,9 +87,7 @@ public T read(T reuse, Decoder in) throws IOException { fastDeserializer = cachedFastDeserializer.get(); } else { fastDeserializer = getFastDeserializerFromCache(cache, writerSchema, readerSchema, modelData); - if (!FastSerdeCache.isFastDeserializer(fastDeserializer)) { - // don't cache - } else { + if (FastSerdeCache.isFastDeserializer(fastDeserializer)) { cachedFastDeserializer.compareAndSet(null, fastDeserializer); if (LOGGER.isDebugEnabled()) { LOGGER.debug("FastGenericDeserializer was generated and cached for reader schema: [" diff --git a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastSerdeBase.java b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastSerdeBase.java index f1c721048..bf1dde9c3 100644 --- a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastSerdeBase.java +++ b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastSerdeBase.java @@ -2,15 +2,12 @@ import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelperCommon; import com.sun.codemodel.JBlock; -import com.sun.codemodel.JClass; import com.sun.codemodel.JCodeModel; import com.sun.codemodel.JConditional; import com.sun.codemodel.JDefinedClass; import com.sun.codemodel.JExpr; import com.sun.codemodel.JExpression; import com.sun.codemodel.JFieldRef; -import com.sun.codemodel.JFieldVar; -import com.sun.codemodel.JMethod; import com.sun.codemodel.JMod; import com.sun.codemodel.JVar; @@ -28,7 +25,6 @@ import org.apache.avro.LogicalType; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; -import org.apache.avro.specific.SpecificData; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastSerdeCache.java b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastSerdeCache.java index 9041e8dc0..bcf817acc 100644 --- a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastSerdeCache.java +++ b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastSerdeCache.java @@ -11,7 +11,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Map; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.Executors; @@ -31,6 +30,7 @@ import org.apache.avro.io.Encoder; import org.apache.avro.specific.SpecificData; import org.apache.avro.specific.SpecificDatumReader; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,6 +46,8 @@ public final class FastSerdeCache { public static final String CLASSPATH = "avro.fast.serde.classpath"; public static final String CLASSPATH_SUPPLIER = "avro.fast.serde.classpath.supplier"; + public static final String FAIL_FAST = "avro.fast.serde.failfast"; + public static final String FAIL_FAST_SUPPLIER = "avro.fast.serde.failfast.supplier"; private static final Logger LOGGER = LoggerFactory.getLogger(FastSerdeCache.class); @@ -61,12 +63,13 @@ public final class FastSerdeCache { private final Map> fastGenericRecordSerializersCache = new FastAvroConcurrentHashMap<>(); - private Executor executor; + private final Executor executor; - private File classesDir; - private ClassLoader classLoader; + private final File classesDir; + private final ClassLoader classLoader; - private Optional compileClassPath; + private final String compileClassPath; + private final boolean failFast; /** * @@ -74,7 +77,7 @@ public final class FastSerdeCache { * custom classpath {@link Supplier} */ public FastSerdeCache(Supplier compileClassPathSupplier) { - this(compileClassPathSupplier != null ? compileClassPathSupplier.get() : null); + this(null, compileClassPathSupplier != null ? compileClassPathSupplier.get() : null); } /** @@ -89,44 +92,45 @@ public FastSerdeCache(Executor executorService, Supplier compileClassPat } public FastSerdeCache(String compileClassPath) { - this(); - this.compileClassPath = Optional.ofNullable(compileClassPath); + this(null, compileClassPath); } /** * * @param executorService * customized {@link Executor} used by serializer/deserializer compile threads - * @param compileClassPath - * custom classpath as string */ + public FastSerdeCache(Executor executorService) { + this(executorService, (String) null); + } + public FastSerdeCache(Executor executorService, String compileClassPath) { - this(executorService); - this.compileClassPath = Optional.ofNullable(compileClassPath); + this(executorService, compileClassPath, false); } /** - * * @param executorService * customized {@link Executor} used by serializer/deserializer compile threads + * @param compileClassPath + * custom classpath as string + * @param failFast + * 'true' indicates generating always-failing Fast(de-)Serializer if fast-serde class couldn't be generated + * (e.g. due to compilation error) */ - public FastSerdeCache(Executor executorService) { + public FastSerdeCache(Executor executorService, String compileClassPath, boolean failFast) { this.executor = executorService != null ? executorService : getDefaultExecutor(); try { Path classesPath = Files.createTempDirectory("generated"); classesDir = classesPath.toFile(); classLoader = - URLClassLoader.newInstance(new URL[]{classesDir.toURI().toURL()}, FastSerdeCache.class.getClassLoader()); + URLClassLoader.newInstance(new URL[]{classesDir.toURI().toURL()}, FastSerdeCache.class.getClassLoader()); } catch (IOException e) { throw new RuntimeException(e); } - this.compileClassPath = Optional.empty(); - } - - private FastSerdeCache() { - this((Executor) null); + this.compileClassPath = compileClassPath; + this.failFast = failFast; } /** @@ -139,38 +143,43 @@ public static FastSerdeCache getDefaultInstance() { if (_INSTANCE == null) { synchronized (FastSerdeCache.class) { if (_INSTANCE == null) { - String classPath = System.getProperty(CLASSPATH); - String classpathSupplierClassName = System.getProperty(CLASSPATH_SUPPLIER); - if (classpathSupplierClassName != null) { - Supplier classpathSupplier = null; - try { - Class classPathSupplierClass = Class.forName(classpathSupplierClassName); - if (Supplier.class.isAssignableFrom(classPathSupplierClass) && String.class.equals( - ((ParameterizedType) classPathSupplierClass.getGenericSuperclass()).getActualTypeArguments()[0])) { - - classpathSupplier = (Supplier) classPathSupplierClass.newInstance(); - } else { - LOGGER.warn( - "classpath supplier must be subtype of java.util.function.Supplier: " + classpathSupplierClassName); - } - } catch (ReflectiveOperationException e) { - LOGGER.warn("unable to instantiate classpath supplier: " + classpathSupplierClassName, e); - } - _INSTANCE = new FastSerdeCache(classpathSupplier); - } else if (classPath != null) { - _INSTANCE = new FastSerdeCache(classPath); - } else { - /** - * The fast-class generator will figure out the compile dependencies during fast-class generation. - */ - _INSTANCE = new FastSerdeCache(""); - } + String compileClassPath = resolveSystemProperty(CLASSPATH_SUPPLIER, CLASSPATH); + String failFast = resolveSystemProperty(FAIL_FAST_SUPPLIER, FAIL_FAST); + /* + * The fast-class generator will figure out the compile dependencies during fast-class generation. + * `compileClassPath` extends above findings, e.g. may provide jar with custom conversions for logical types. + */ + _INSTANCE = new FastSerdeCache(null, compileClassPath, Boolean.parseBoolean(failFast)); } } } return _INSTANCE; } + private static String resolveSystemProperty(String supplierPropertyName, String propertyName) { + String supplierClassName = System.getProperty(supplierPropertyName); + + if (StringUtils.isNotBlank(supplierClassName)) { + Supplier supplier = null; + try { + Class supplierClass = Class.forName(supplierClassName); + if (Supplier.class.isAssignableFrom(supplierClass) && String.class.equals( + ((ParameterizedType) supplierClass.getGenericSuperclass()).getActualTypeArguments()[0])) { + + supplier = (Supplier) supplierClass.getConstructor().newInstance(); + } else { + LOGGER.warn("Supplier must be subtype of java.util.function.Supplier: " + supplierClassName); + } + } catch (ReflectiveOperationException e) { + LOGGER.warn("Unable to instantiate supplier: " + supplierClassName, e); + } + + return supplier != null ? supplier.get() : null; + } + + return System.getProperty(propertyName); + } + public static boolean isSupportedForFastDeserializer(Schema.Type readerSchemaType) { return readerSchemaType.equals(Schema.Type.RECORD) || readerSchemaType.equals(Schema.Type.MAP) || readerSchemaType.equals(Schema.Type.ARRAY); @@ -365,7 +374,7 @@ public CompletableFuture> getFastSpecificDeserializerAsync(S * @see #getFastGenericDeserializerAsync(Schema, Schema, GenericData) */ public CompletableFuture> getFastGenericDeserializerAsync(Schema writerSchema, Schema readerSchema) { - return getFastGenericDeserializerAsync(writerSchema, readerSchema); + return getFastGenericDeserializerAsync(writerSchema, readerSchema, null); } /** @@ -416,8 +425,7 @@ public FastDeserializer buildFastSpecificDeserializer(Schema writerSchema, Sc */ public FastDeserializer buildFastSpecificDeserializer(Schema writerSchema, Schema readerSchema, SpecificData modelData) { FastSpecificDeserializerGenerator generator = - new FastSpecificDeserializerGenerator<>(writerSchema, readerSchema, classesDir, classLoader, - compileClassPath.orElse(null), modelData); + new FastSpecificDeserializerGenerator<>(writerSchema, readerSchema, classesDir, classLoader, compileClassPath, modelData); FastDeserializer fastDeserializer = generator.generateDeserializer(); if (LOGGER.isDebugEnabled()) { @@ -447,17 +455,20 @@ private FastDeserializer buildSpecificDeserializer(Schema writerSchema, Schem try { return buildFastSpecificDeserializer(writerSchema, readerSchema, modelData); } catch (FastDeserializerGeneratorException e) { - LOGGER.warn("Deserializer generation exception when generating specific FastDeserializer for writer schema: " + LOGGER.error("Deserializer generation exception when generating specific FastDeserializer for writer schema: " + "[\n{}\n] and reader schema: [\n{}\n]", writerSchema.toString(true), readerSchema.toString(true), e); } catch (Exception e) { - LOGGER.warn("Deserializer class instantiation exception", e); + LOGGER.error("Deserializer class instantiation exception", e); } return new FastDeserializer() { - private DatumReader datumReader = new SpecificDatumReader<>(writerSchema, readerSchema); + private DatumReader datumReader = AvroCompatibilityHelper.newSpecificDatumReader(writerSchema, readerSchema, modelData); @Override public Object deserialize(Object reuse, Decoder d) throws IOException { + if (failFast) { + throw new UnsupportedOperationException("Fast specific deserializer could not be generated."); + } return datumReader.read(reuse, d); } }; @@ -481,8 +492,7 @@ public FastDeserializer buildFastGenericDeserializer(Schema writerSchema, Sch */ public FastDeserializer buildFastGenericDeserializer(Schema writerSchema, Schema readerSchema, GenericData modelData) { FastGenericDeserializerGenerator generator = - new FastGenericDeserializerGenerator<>(writerSchema, readerSchema, classesDir, classLoader, - compileClassPath.orElse(null), modelData); + new FastGenericDeserializerGenerator<>(writerSchema, readerSchema, classesDir, classLoader, compileClassPath, modelData); FastDeserializer fastDeserializer = generator.generateDeserializer(); @@ -513,17 +523,20 @@ private FastDeserializer buildGenericDeserializer(Schema writerSchema, Schema try { return buildFastGenericDeserializer(writerSchema, readerSchema, modelData); } catch (FastDeserializerGeneratorException e) { - LOGGER.warn("Deserializer generation exception when generating generic FastDeserializer for writer schema: [\n" + LOGGER.error("Deserializer generation exception when generating generic FastDeserializer for writer schema: [\n" + writerSchema.toString(true) + "\n] and reader schema:[\n" + readerSchema.toString(true) + "\n]", e); } catch (Exception e) { - LOGGER.warn("Deserializer class instantiation exception:" + e); + LOGGER.error("Deserializer class instantiation exception:", e); } return new FastDeserializer() { - private DatumReader datumReader = new GenericDatumReader<>(writerSchema, readerSchema, modelData); + private DatumReader datumReader = AvroCompatibilityHelper.newGenericDatumReader(writerSchema, readerSchema, modelData); @Override public Object deserialize(Object reuse, Decoder d) throws IOException { + if (failFast) { + throw new UnsupportedOperationException("Fast generic deserializer could not be generated."); + } return datumReader.read(reuse, d); } }; @@ -540,7 +553,7 @@ public FastSerializer buildFastSpecificSerializer(Schema schema, SpecificData Utils.getAvroVersionsSupportedForSerializer()); } FastSpecificSerializerGenerator generator = - new FastSpecificSerializerGenerator<>(schema, classesDir, classLoader, compileClassPath.orElse(null), modelData); + new FastSpecificSerializerGenerator<>(schema, classesDir, classLoader, compileClassPath, modelData); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Generated classes dir: {} and generation of specific FastSerializer is done for schema of type: {}" + @@ -560,10 +573,10 @@ private FastSerializer buildSpecificSerializer(Schema schema, SpecificData mo try { return buildFastSpecificSerializer(schema, modelData); } catch (FastDeserializerGeneratorException e) { - LOGGER.warn("Serializer generation exception when generating specific FastSerializer for schema: [\n{}\n]", + LOGGER.error("Serializer generation exception when generating specific FastSerializer for schema: [\n{}\n]", schema.toString(true), e); } catch (Exception e) { - LOGGER.warn("Serializer class instantiation exception", e); + LOGGER.error("Serializer class instantiation exception", e); } } @@ -573,6 +586,9 @@ private FastSerializer buildSpecificSerializer(Schema schema, SpecificData mo @Override public void serialize(Object data, Encoder e) throws IOException { + if (failFast) { + throw new UnsupportedOperationException("Fast specific serializer could not be generated."); + } datumWriter.write(data, e); } }; @@ -589,7 +605,7 @@ public FastSerializer buildFastGenericSerializer(Schema schema, GenericData m + Utils.getAvroVersionsSupportedForSerializer()); } FastGenericSerializerGenerator generator = - new FastGenericSerializerGenerator<>(schema, classesDir, classLoader, compileClassPath.orElse(null), modelData); + new FastGenericSerializerGenerator<>(schema, classesDir, classLoader, compileClassPath, modelData); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Generated classes dir: {} and generation of generic FastSerializer is done for schema of type: {}" + @@ -609,10 +625,10 @@ private FastSerializer buildGenericSerializer(Schema schema, GenericData mode try { return buildFastGenericSerializer(schema, modelData); } catch (FastDeserializerGeneratorException e) { - LOGGER.warn("Serializer generation exception when generating generic FastSerializer for schema: [\n{}\n]", + LOGGER.error("Serializer generation exception when generating generic FastSerializer for schema: [\n{}\n]", schema.toString(true), e); } catch (Exception e) { - LOGGER.warn("Serializer class instantiation exception", e); + LOGGER.error("Serializer class instantiation exception", e); } } @@ -622,6 +638,9 @@ private FastSerializer buildGenericSerializer(Schema schema, GenericData mode @Override public void serialize(Object data, Encoder e) throws IOException { + if (failFast) { + throw new UnsupportedOperationException("Fast generic serializer could not be generated."); + } datumWriter.write(data, e); } }; diff --git a/settings.gradle b/settings.gradle index 0151c5bbe..b0913dba9 100644 --- a/settings.gradle +++ b/settings.gradle @@ -87,6 +87,7 @@ include 'demos:spotbugs-demo' include 'fastserde:avro-fastserde' include 'fastserde:avro-fastserde-jmh' +include 'fastserde:avro-fastserde-tests-common' include 'fastserde:avro-fastserde-tests14' include 'fastserde:avro-fastserde-tests15' include 'fastserde:avro-fastserde-tests16' @@ -101,4 +102,4 @@ gradleEnterprise { termsOfServiceUrl = "https://gradle.com/terms-of-service" termsOfServiceAgree = "yes" } -} \ No newline at end of file +}