Skip to content

Commit

Permalink
merge: #7150
Browse files Browse the repository at this point in the history
7150: Fix issues with class loading for external exporters relying on the Thread's context class loader r=npepinpe a=npepinpe

## Description

This PR fixes issues when an external exporter (i.e. a self-contained JAR) wants to load classes via the current thread's context class loader. This occurred, for example, in the Kafka exporter, as the Kafka library will use the current thread's context class loader instead of just the current class loader. The fix is to wrap every exporter call and set/reset the context class loader before/after.

Part of this PR brings improvements to the exporter tests, mostly relating to external exporters. It introduces ByteBuddy as a simpler alternative to building external exporters: ByteBuddy is a battle tested library which gives you an easy to use API to implement classes on the fly and package them as JARs. Additionally, by using this instead of pre-made classes and building JARs out of them, we can guarantee that the class ByteBuddy creates does not exist in the current class loader, which increases the robustness of our tests.

This PR also introduces junit5 to the broker and migrates the modified tests to it, as part of our long term goals to migrate from junit4 to junit5.

I recommend the reviewer to check commit-by-commit, but keep in mind that the last commit is the "goal" of the PR, and all the work prior to it is essentially building up to it. I'm also happy to split this into multiple PR if that's easier to review instead, don't hesitate to ask! I hesitated on this as I felt it was important to keep the context, but in the end I think the reviewer will have better insight in what's easier for them.

I have some worries about the PR, in that while using ByteBuddy is probably more reliable than writing out own code to churn out classes on the fly and create JARs for them, it's also a little "arcane" if you've never used it, and I fear the test is maybe more complex than desired. I'd be happy to hear thoughts on how we could avoid this complexity while still testing the functionality. Although working on this really impressed on me that we may want to rethink exporters, and instead of running arbitrary code in the broker, we may just want to provide an API to get data out.

## Related issues

closes #4196 



Co-authored-by: Nicolas Pepin-Perreault <nicolas.pepin-perreault@camunda.com>
  • Loading branch information
zeebe-bors-cloud[bot] and npepinpe authored Jun 2, 2021
2 parents eda7a26 + ee8a953 commit 381f201
Show file tree
Hide file tree
Showing 19 changed files with 637 additions and 385 deletions.
23 changes: 17 additions & 6 deletions broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,18 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand Down Expand Up @@ -180,12 +192,6 @@
<artifactId>gson</artifactId>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.camunda</groupId>
<artifactId>zeebe-workflow-engine</artifactId>
Expand Down Expand Up @@ -240,6 +246,11 @@
<artifactId>spring-boot-actuator</artifactId>
</dependency>

<dependency>
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import io.camunda.zeebe.broker.Loggers;
import io.camunda.zeebe.broker.system.monitoring.BrokerStepMetrics;
import io.camunda.zeebe.util.CheckedRunnable;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,19 @@
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.file.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Provides a class loader which isolates external exporters from other exporters, while exposing
* our own code to ensure versions match at runtime.
*/
public final class ExporterJarClassLoader extends URLClassLoader {
private static final Logger LOGGER = LoggerFactory.getLogger(ExporterJarClassLoader.class);

private static final String JAVA_PACKAGE_PREFIX = "java.";
private static final String JAR_URL_FORMAT = "jar:%s!/";

/** lists of packages from broker base that are exposed at runtime to the external exporters */
private static final String[] EXPOSED_PACKAGE_PREFIXES =
new String[] {"io.camunda.zeebe.exporter.api", "org.slf4j.", "org.apache.logging.log4j."};

private ExporterJarClassLoader(final URL[] urls) {
super(urls);
}
Expand All @@ -48,30 +48,17 @@ public Class<?> loadClass(final String name) throws ClassNotFoundException {
return findSystemClass(name);
}

if (isProvidedByBroker(name)) {
return getSystemClassLoader().loadClass(name);
}

Class<?> clazz = findLoadedClass(name);
if (clazz == null) {
try {
clazz = findClass(name);
} catch (final ClassNotFoundException ex) {
LOGGER.trace("Failed to load class {}, falling back to parent class loader", name, ex);
clazz = super.loadClass(name);
}
}

return clazz;
}
}

private boolean isProvidedByBroker(final String name) {
for (final String prefix : EXPOSED_PACKAGE_PREFIXES) {
if (name.startsWith(prefix)) {
return true;
}
}

return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.broker.exporter.jar;

import io.camunda.zeebe.util.CheckedRunnable;
import org.agrona.LangUtil;

/**
* A collection of utilities to run an arbitrary {@link Runnable} with a specific thread context
* class loader. This is required when side loading external code via the {@link
* ExporterJarClassLoader}, as that code may be using the {@link Thread#getContextClassLoader()}.
*
* <p>As the same thread may be reused, it's also important to reset the thread afterwards to avoid
* operations being run on the wrong class loader.
*/
public final class ThreadContextUtil {

/**
* Executes the given {@code runnable}, swapping the thread context class loader for the given
* class loader, and swapping it back with the previous class loader afterwards.
*
* @param runnable the operation to execute
* @param classLoader the class loader to temporarily assign to the current thread's context class
* loader
*/
public static void runWithClassLoader(final Runnable runnable, final ClassLoader classLoader) {
try {
runCheckedWithClassLoader(runnable::run, classLoader);
} catch (final Exception e) {
LangUtil.rethrowUnchecked(e);
}
}

/**
* Executes the given {@code runnable}, swapping the thread context class loader for the given
* class loader, and swapping it back with the previous class loader afterwards.
*
* <p>Use this method if you want your operation to throw exceptions; the class loader is
* guaranteed to be reset even if an exception is thrown.
*
* @param runnable the operation to execute
* @param classLoader the class loader to temporarily assign to the current thread's context class
* loader
*/
public static void runCheckedWithClassLoader(
final CheckedRunnable runnable, final ClassLoader classLoader) throws Exception {
final var currentThread = Thread.currentThread();
final var contextClassLoader = currentThread.getContextClassLoader();

try {
currentThread.setContextClassLoader(classLoader);
runnable.run();
} finally {
currentThread.setContextClassLoader(contextClassLoader);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import io.camunda.zeebe.broker.exporter.context.ExporterConfiguration;
import io.camunda.zeebe.exporter.api.Exporter;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;

public class ExporterDescriptor {
Expand All @@ -25,8 +26,11 @@ public ExporterDescriptor(

public Exporter newInstance() throws ExporterInstantiationException {
try {
return exporterClass.newInstance();
} catch (final InstantiationException | IllegalAccessException e) {
return exporterClass.getDeclaredConstructor().newInstance();
} catch (final InstantiationException
| IllegalAccessException
| NoSuchMethodException
| InvocationTargetException e) {
throw new ExporterInstantiationException(getId(), e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.camunda.zeebe.broker.exporter.context.ExporterContext;
import io.camunda.zeebe.broker.exporter.jar.ExporterJarLoadException;
import io.camunda.zeebe.broker.exporter.jar.ExporterJarRepository;
import io.camunda.zeebe.broker.exporter.jar.ThreadContextUtil;
import io.camunda.zeebe.broker.system.configuration.ExporterCfg;
import io.camunda.zeebe.exporter.api.Exporter;
import java.util.Collections;
Expand Down Expand Up @@ -88,7 +89,9 @@ private void validate(final ExporterDescriptor descriptor) throws ExporterLoadEx
try {
final Exporter instance = descriptor.newInstance();
final ExporterContext context = new ExporterContext(LOG, descriptor.getConfiguration());
instance.configure(context);

ThreadContextUtil.runCheckedWithClassLoader(
() -> instance.configure(context), instance.getClass().getClassLoader());
} catch (final Exception ex) {
throw new ExporterLoadException(descriptor.getId(), "failed validation", ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import io.camunda.zeebe.broker.Loggers;
import io.camunda.zeebe.broker.exporter.context.ExporterContext;
import io.camunda.zeebe.broker.exporter.jar.ThreadContextUtil;
import io.camunda.zeebe.broker.exporter.repo.ExporterDescriptor;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecord;
import io.camunda.zeebe.exporter.api.Exporter;
Expand All @@ -21,6 +22,7 @@
import java.time.Duration;
import org.slf4j.Logger;

@SuppressWarnings("java:S112") // allow generic exception when calling Exporter#configure
final class ExporterContainer implements Controller {

private static final Logger LOG = Loggers.EXPORTER_LOGGER;
Expand Down Expand Up @@ -61,7 +63,8 @@ void initPosition() {

void openExporter() {
LOG.debug("Open exporter with id '{}'", getId());
exporter.open(this);
ThreadContextUtil.runWithClassLoader(
() -> exporter.open(this), exporter.getClass().getClassLoader());
}

public ExporterContext getContext() {
Expand Down Expand Up @@ -128,7 +131,8 @@ private boolean acceptRecord(final RecordMetadata metadata) {

void configureExporter() throws Exception {
LOG.debug("Configure exporter with id '{}'", getId());
exporter.configure(context);
ThreadContextUtil.runCheckedWithClassLoader(
() -> exporter.configure(context), exporter.getClass().getClassLoader());
}

boolean exportRecord(final RecordMetadata rawMetadata, final TypedRecord typedEvent) {
Expand All @@ -148,13 +152,15 @@ boolean exportRecord(final RecordMetadata rawMetadata, final TypedRecord typedEv
}

private void export(final Record<?> record) {
exporter.export(record);
ThreadContextUtil.runWithClassLoader(
() -> exporter.export(record), exporter.getClass().getClassLoader());
lastUnacknowledgedPosition = record.getPosition();
}

public void close() {
try {
exporter.close();
ThreadContextUtil.runCheckedWithClassLoader(
exporter::close, exporter.getClass().getClassLoader());
} catch (final Exception e) {
context.getLogger().error("Error on close", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,87 +9,48 @@

import static org.assertj.core.api.Assertions.assertThat;

import io.camunda.zeebe.broker.exporter.util.JarCreatorRule;
import io.camunda.zeebe.broker.exporter.util.TestJarExporter;
import io.camunda.zeebe.broker.exporter.util.ExternalExporter;
import io.camunda.zeebe.exporter.api.Exporter;
import java.io.File;
import org.apache.logging.log4j.LogManager;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.junit.rules.TemporaryFolder;
import java.io.IOException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.slf4j.Logger;

public final class ExporterJarClassLoaderTest {
private final TemporaryFolder temporaryFolder = new TemporaryFolder();
private final JarCreatorRule jarCreator = new JarCreatorRule(temporaryFolder);

@Rule public RuleChain chain = RuleChain.outerRule(temporaryFolder).around(jarCreator);

@Test
public void shouldLoadClassesPackagedInJar() throws Exception {
final Class exportedClass = TestJarExporter.class;
final File jarFile = jarCreator.create(exportedClass);
final ExporterJarClassLoader classLoader = ExporterJarClassLoader.ofPath(jarFile.toPath());

// when
final Class<?> loadedClass = classLoader.loadClass(exportedClass.getCanonicalName());

// then
assertThat(loadedClass).isNotEqualTo(exportedClass);
assertThat(loadedClass.getDeclaredField("FOO").get(loadedClass)).isEqualTo(TestJarExporter.FOO);
assertThat(loadedClass.newInstance()).isInstanceOf(Exporter.class);
}

@Test
public void shouldLoadSystemClassesFromSystemClassLoader() throws Exception {
final Class exportedClass = TestJarExporter.class;
final File jarFile = jarCreator.create(exportedClass);
final ExporterJarClassLoader classLoader = ExporterJarClassLoader.ofPath(jarFile.toPath());

// when
final Class loadedClass = classLoader.loadClass(String.class.getCanonicalName());

// then
assertThat(loadedClass).isEqualTo(String.class);
}
@Execution(ExecutionMode.CONCURRENT)
final class ExporterJarClassLoaderTest {

@Test
public void shouldLoadZbExporterClassesFromSystemClassLoader() throws Exception {
final Class exportedClass = TestJarExporter.class;
final File jarFile = jarCreator.create(exportedClass);
final ExporterJarClassLoader classLoader = ExporterJarClassLoader.ofPath(jarFile.toPath());
void shouldLoadClassesPackagedInJar(final @TempDir File tempDir) throws Exception {
final var exporterClass = ExternalExporter.createUnloadedExporterClass();
final var jarFile = exporterClass.toJar(new File(tempDir, "exporter.jar"));
final var classLoader = ExporterJarClassLoader.ofPath(jarFile.toPath());

// when
final Class loadedClass = classLoader.loadClass(Exporter.class.getCanonicalName());
final var loadedClass = classLoader.loadClass(ExternalExporter.EXPORTER_CLASS_NAME);

// then
assertThat(loadedClass).isEqualTo(Exporter.class);
final var constructor = loadedClass.getConstructor();
assertThat(loadedClass.getDeclaredField("FOO").get(loadedClass)).isEqualTo("bar");
assertThat(constructor.newInstance()).isInstanceOf(Exporter.class);
}

@Test
public void shouldLoadSL4JClassesFromSystemClassLoader() throws Exception {
final Class exportedClass = TestJarExporter.class;
final File jarFile = jarCreator.create(exportedClass);
final ExporterJarClassLoader classLoader = ExporterJarClassLoader.ofPath(jarFile.toPath());
void shouldUseSystemClassLoaderAsFallback(final @TempDir File tempDir)
throws IOException, ClassNotFoundException {
final var exporterClass = ExternalExporter.createUnloadedExporterClass();
final var jarFile = exporterClass.toJar(new File(tempDir, "exporter.jar"));
final var classLoader = ExporterJarClassLoader.ofPath(jarFile.toPath());

// when
final Class loadedClass = classLoader.loadClass(Logger.class.getCanonicalName());
final var loadedClass = classLoader.loadClass(Logger.class.getCanonicalName());

// then
assertThat(loadedClass).isEqualTo(Logger.class);
}

@Test
public void shouldLoadLog4JClassesFromSystemClassLoader() throws Exception {
final Class exportedClass = TestJarExporter.class;
final File jarFile = jarCreator.create(exportedClass);
final ExporterJarClassLoader classLoader = ExporterJarClassLoader.ofPath(jarFile.toPath());

// when
final Class loadedClass = classLoader.loadClass(LogManager.class.getCanonicalName());

// then
assertThat(loadedClass).isEqualTo(LogManager.class);
assertThat(classLoader.getParent())
.isEqualTo(getClass().getClassLoader())
.isEqualTo(ClassLoader.getSystemClassLoader());
}
}
Loading

0 comments on commit 381f201

Please sign in to comment.