diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java index 9289162b6386..192d0391a0bb 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java @@ -28,17 +28,23 @@ import org.reflections.Configuration; import org.reflections.Reflections; import org.reflections.ReflectionsException; +import org.reflections.Store; import org.reflections.scanners.SubTypesScanner; import org.reflections.util.ConfigurationBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.reflect.Field; import java.net.URL; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.Map; import java.util.ServiceLoader; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; /** * A {@link PluginScanner} implementation which uses reflection and {@link ServiceLoader} to discover plugins. @@ -159,5 +165,61 @@ protected void scan(URL url) { } } } + + @Override + protected void scan() { + // The super constructor instantiates a store to be used in this method. + // Replace the store with a custom subclass, then call the normal scan to use the custom store. + store = new InternalStore(); + super.scan(); + } + } + + private static class InternalStore extends Store { + + private static final Field STORE_MAP_FIELD = storeMapField(); + private final ConcurrentHashMap>> storeMap; + + private static Field storeMapField() { + try { + Field field = Store.class.getDeclaredField("storeMap"); + field.setAccessible(true); + return field; + } catch (Throwable e) { + log.error("Unable to access org.reflections.Store#storeMap, falling back to default behavior", e); + return null; + } + } + + public InternalStore() { + super(); + storeMap = storeMap(); + } + + @SuppressWarnings("unchecked") + private ConcurrentHashMap>> storeMap() { + if (STORE_MAP_FIELD == null) { + return null; + } + try { + return (ConcurrentHashMap>>) STORE_MAP_FIELD.get(this); + } catch (Throwable e) { + log.error("Unable to access org.reflections.Store#storeMap, falling back to default behavior", e); + return null; + } + } + + @Override + public boolean put(String index, String key, String value) { + if (storeMap != null) { + // When Reflections is used for parallel scans, it has a bug where concurrent calls to add() cause + // nulls to appear in the non-thread-safe ArrayList. Override in order to insert synchronized ArrayLists. + return storeMap.computeIfAbsent(index, s -> new ConcurrentHashMap<>()) + .computeIfAbsent(key, s -> Collections.synchronizedList(new ArrayList<>())) + .add(value); + } else { + return super.put(index, key, value); + } + } } }