diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 4c5b6aab1b7f4..8c62536971990 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -43,6 +43,16 @@ metadataStoreOperationTimeoutSeconds: 30
# Metadata store cache expiry time in seconds
metadataStoreCacheExpirySeconds: 300
+# Specifies if the function worker should use classloading for validating submissions for built-in
+# connectors and functions. This is required for validateConnectorConfig to take effect.
+# Default is false.
+enableClassloadingOfBuiltinFiles: false
+
+# Specifies if the function worker should use classloading for validating submissions for external
+# connectors and functions. This is required for validateConnectorConfig to take effect.
+# Default is false.
+enableClassloadingOfExternalFiles: false
+
################################
# Function package management
################################
@@ -400,7 +410,10 @@ saslJaasServerRoleTokenSignerSecretPath:
connectorsDirectory: ./connectors
functionsDirectory: ./functions
-# Should connector config be validated during submission
+# Enables extended validation for connector config with fine-grain annotation based validation
+# during submission. Classloading with either enableClassloadingOfExternalFiles or
+# enableClassloadingOfBuiltinFiles must be enabled on the worker for this to take effect.
+# Default is false.
validateConnectorConfig: false
# Whether to initialize distributed log metadata by runtime.
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index 4755e14911ac2..3bb20c6d23ba2 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -452,6 +452,10 @@ The Apache Software License, Version 2.0
* Jodah
- net.jodah-typetools-0.5.0.jar
- net.jodah-failsafe-2.4.4.jar
+ * Byte Buddy
+ - net.bytebuddy-byte-buddy-1.14.12.jar
+ * zt-zip
+ - org.zeroturnaround-zt-zip-1.17.jar
* Apache Avro
- org.apache.avro-avro-1.11.3.jar
- org.apache.avro-avro-protobuf-1.11.3.jar
diff --git a/pom.xml b/pom.xml
index e0f45280a4f98..beb1700d167ac 100644
--- a/pom.xml
+++ b/pom.xml
@@ -166,6 +166,8 @@ flexible messaging model and an intuitive client API.
0.43.3
true
0.5.0
+ 1.14.12
+ 1.17
3.19.6
${protobuf3.version}
1.55.3
@@ -1066,6 +1068,18 @@ flexible messaging model and an intuitive client API.
${typetools.version}
+
+ net.bytebuddy
+ byte-buddy
+ ${byte-buddy.version}
+
+
+
+ org.zeroturnaround
+ zt-zip
+ ${zt-zip.version}
+
+
io.grpc
grpc-bom
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
index 620e1156d3555..9736d8b47ef71 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
@@ -154,6 +154,11 @@ public NarClassLoader run() {
});
}
+ public static List getClasspathFromArchive(File narPath, String narExtractionDirectory) throws IOException {
+ File unpacked = NarUnpacker.unpackNar(narPath, getNarExtractionDirectory(narExtractionDirectory));
+ return getClassPathEntries(unpacked);
+ }
+
private static File getNarExtractionDirectory(String configuredDirectory) {
return new File(configuredDirectory + "/" + TMP_DIR_PREFIX);
}
@@ -164,16 +169,11 @@ private static File getNarExtractionDirectory(String configuredDirectory) {
* @param narWorkingDirectory
* directory to explode nar contents to
* @param parent
- * @throws IllegalArgumentException
- * if the NAR is missing the Java Services API file for FlowFileProcessor implementations.
- * @throws ClassNotFoundException
- * if any of the FlowFileProcessor implementations defined by the Java Services API cannot be
- * loaded.
* @throws IOException
* if an error occurs while loading the NAR.
*/
private NarClassLoader(final File narWorkingDirectory, Set additionalJars, ClassLoader parent)
- throws ClassNotFoundException, IOException {
+ throws IOException {
super(new URL[0], parent);
this.narWorkingDirectory = narWorkingDirectory;
@@ -238,22 +238,31 @@ public List getServiceImplementation(String serviceName) throws IOExcept
* if the URL list could not be updated.
*/
private void updateClasspath(File root) throws IOException {
- addURL(root.toURI().toURL()); // for compiled classes, META-INF/, etc.
+ getClassPathEntries(root).forEach(f -> {
+ try {
+ addURL(f.toURI().toURL());
+ } catch (IOException e) {
+ log.error("Failed to add entry to classpath: {}", f, e);
+ }
+ });
+ }
+ static List getClassPathEntries(File root) {
+ List classPathEntries = new ArrayList<>();
+ classPathEntries.add(root);
File dependencies = new File(root, "META-INF/bundled-dependencies");
if (!dependencies.isDirectory()) {
- log.warn("{} does not contain META-INF/bundled-dependencies!", narWorkingDirectory);
+ log.warn("{} does not contain META-INF/bundled-dependencies!", root);
}
- addURL(dependencies.toURI().toURL());
+ classPathEntries.add(dependencies);
if (dependencies.isDirectory()) {
final File[] jarFiles = dependencies.listFiles(JAR_FILTER);
if (jarFiles != null) {
Arrays.sort(jarFiles, Comparator.comparing(File::getName));
- for (File libJar : jarFiles) {
- addURL(libJar.toURI().toURL());
- }
+ classPathEntries.addAll(Arrays.asList(jarFiles));
}
}
+ return classPathEntries;
}
@Override
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java
index 9bd5bc48df819..1e34c3e4fe706 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java
@@ -32,13 +32,14 @@
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
+import java.nio.file.Path;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import java.util.Enumeration;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.jar.JarEntry;
-import java.util.jar.JarFile;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
import lombok.extern.slf4j.Slf4j;
/**
@@ -113,18 +114,24 @@ static File doUnpackNar(final File nar, final File baseWorkingDirectory, Runnabl
* if the NAR could not be unpacked.
*/
private static void unpack(final File nar, final File workingDirectory) throws IOException {
- try (JarFile jarFile = new JarFile(nar)) {
- Enumeration jarEntries = jarFile.entries();
- while (jarEntries.hasMoreElements()) {
- JarEntry jarEntry = jarEntries.nextElement();
- String name = jarEntry.getName();
- File f = new File(workingDirectory, name);
- if (jarEntry.isDirectory()) {
+ Path workingDirectoryPath = workingDirectory.toPath().normalize();
+ try (ZipFile zipFile = new ZipFile(nar)) {
+ Enumeration extends ZipEntry> zipEntries = zipFile.entries();
+ while (zipEntries.hasMoreElements()) {
+ ZipEntry zipEntry = zipEntries.nextElement();
+ String name = zipEntry.getName();
+ Path targetFilePath = workingDirectoryPath.resolve(name).normalize();
+ if (!targetFilePath.startsWith(workingDirectoryPath)) {
+ log.error("Invalid zip file with entry '{}'", name);
+ throw new IOException("Invalid zip file. Aborting unpacking.");
+ }
+ File f = targetFilePath.toFile();
+ if (zipEntry.isDirectory()) {
FileUtils.ensureDirectoryExistAndCanReadAndWrite(f);
} else {
// The directory entry might appear after the file entry
FileUtils.ensureDirectoryExistAndCanReadAndWrite(f.getParentFile());
- makeFile(jarFile.getInputStream(jarEntry), f);
+ makeFile(zipFile.getInputStream(zipEntry), f);
}
}
}
diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index ed9b0af3b43d8..711fa33edb2a2 100644
--- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -52,7 +52,9 @@
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.FunctionDefinition;
import org.apache.pulsar.common.functions.Utils;
+import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.nar.FileUtils;
@@ -75,8 +77,11 @@
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
+import org.apache.pulsar.functions.utils.FunctionRuntimeCommon;
+import org.apache.pulsar.functions.utils.LoadedFunctionPackage;
import org.apache.pulsar.functions.utils.SinkConfigUtils;
import org.apache.pulsar.functions.utils.SourceConfigUtils;
+import org.apache.pulsar.functions.utils.ValidatableFunctionPackage;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
import org.apache.pulsar.functions.utils.functions.FunctionArchive;
import org.apache.pulsar.functions.utils.functions.FunctionUtils;
@@ -357,9 +362,12 @@ public void start(boolean blocking) throws Exception {
userCodeFile = functionConfig.getJar();
userCodeClassLoader = extractClassLoader(
userCodeFile, ComponentType.FUNCTION, functionConfig.getClassName());
+ ValidatableFunctionPackage validatableFunctionPackage =
+ new LoadedFunctionPackage(getCurrentOrUserCodeClassLoader(),
+ FunctionDefinition.class);
functionDetails = FunctionConfigUtils.convert(
functionConfig,
- FunctionConfigUtils.validateJavaFunction(functionConfig, getCurrentOrUserCodeClassLoader()));
+ FunctionConfigUtils.validateJavaFunction(functionConfig, validatableFunctionPackage));
} else if (functionConfig.getRuntime() == FunctionConfig.Runtime.GO) {
userCodeFile = functionConfig.getGo();
} else if (functionConfig.getRuntime() == FunctionConfig.Runtime.PYTHON) {
@@ -369,7 +377,10 @@ public void start(boolean blocking) throws Exception {
}
if (functionDetails == null) {
- functionDetails = FunctionConfigUtils.convert(functionConfig, getCurrentOrUserCodeClassLoader());
+ ValidatableFunctionPackage validatableFunctionPackage =
+ new LoadedFunctionPackage(getCurrentOrUserCodeClassLoader(),
+ FunctionDefinition.class);
+ functionDetails = FunctionConfigUtils.convert(functionConfig, validatableFunctionPackage);
}
} else if (sourceConfig != null) {
inferMissingArguments(sourceConfig);
@@ -377,9 +388,10 @@ public void start(boolean blocking) throws Exception {
parallelism = sourceConfig.getParallelism();
userCodeClassLoader = extractClassLoader(
userCodeFile, ComponentType.SOURCE, sourceConfig.getClassName());
- functionDetails = SourceConfigUtils.convert(
- sourceConfig,
- SourceConfigUtils.validateAndExtractDetails(sourceConfig, getCurrentOrUserCodeClassLoader(), true));
+ ValidatableFunctionPackage validatableFunctionPackage =
+ new LoadedFunctionPackage(getCurrentOrUserCodeClassLoader(), ConnectorDefinition.class);
+ functionDetails = SourceConfigUtils.convert(sourceConfig,
+ SourceConfigUtils.validateAndExtractDetails(sourceConfig, validatableFunctionPackage, true));
} else if (sinkConfig != null) {
inferMissingArguments(sinkConfig);
userCodeFile = sinkConfig.getArchive();
@@ -387,6 +399,8 @@ public void start(boolean blocking) throws Exception {
parallelism = sinkConfig.getParallelism();
userCodeClassLoader = extractClassLoader(
userCodeFile, ComponentType.SINK, sinkConfig.getClassName());
+ ValidatableFunctionPackage validatableFunctionPackage =
+ new LoadedFunctionPackage(getCurrentOrUserCodeClassLoader(), ConnectorDefinition.class);
if (isNotEmpty(sinkConfig.getTransformFunction())) {
transformFunctionCodeClassLoader = extractClassLoader(
sinkConfig.getTransformFunction(),
@@ -395,16 +409,19 @@ public void start(boolean blocking) throws Exception {
}
ClassLoader functionClassLoader = null;
+ ValidatableFunctionPackage validatableTransformFunction = null;
if (transformFunctionCodeClassLoader != null) {
functionClassLoader = transformFunctionCodeClassLoader.getClassLoader() == null
? Thread.currentThread().getContextClassLoader()
: transformFunctionCodeClassLoader.getClassLoader();
+ validatableTransformFunction =
+ new LoadedFunctionPackage(functionClassLoader, FunctionDefinition.class);
}
functionDetails = SinkConfigUtils.convert(
sinkConfig,
- SinkConfigUtils.validateAndExtractDetails(sinkConfig, getCurrentOrUserCodeClassLoader(),
- functionClassLoader, true));
+ SinkConfigUtils.validateAndExtractDetails(sinkConfig, validatableFunctionPackage,
+ validatableTransformFunction, true));
} else {
throw new IllegalArgumentException("Must specify Function, Source or Sink config");
}
@@ -472,7 +489,7 @@ private UserCodeClassLoader extractClassLoader(String userCodeFile, ComponentTyp
if (classLoader == null) {
if (userCodeFile != null && Utils.isFunctionPackageUrlSupported(userCodeFile)) {
File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
- classLoader = FunctionCommon.getClassLoaderFromPackage(
+ classLoader = FunctionRuntimeCommon.getClassLoaderFromPackage(
componentType, className, file, narExtractionDirectory);
classLoaderCreated = true;
} else if (userCodeFile != null) {
@@ -494,7 +511,7 @@ private UserCodeClassLoader extractClassLoader(String userCodeFile, ComponentTyp
}
throw new RuntimeException(errorMsg + " (" + userCodeFile + ") does not exist");
}
- classLoader = FunctionCommon.getClassLoaderFromPackage(
+ classLoader = FunctionRuntimeCommon.getClassLoaderFromPackage(
componentType, className, file, narExtractionDirectory);
classLoaderCreated = true;
} else {
@@ -713,7 +730,7 @@ private ClassLoader isBuiltInFunction(String functionType) throws IOException {
FunctionArchive function = functions.get(functionName);
if (function != null && function.getFunctionDefinition().getFunctionClass() != null) {
// Function type is a valid built-in type.
- return function.getClassLoader();
+ return function.getFunctionPackage().getClassLoader();
} else {
return null;
}
@@ -727,7 +744,7 @@ private ClassLoader isBuiltInSource(String sourceType) throws IOException {
Connector connector = connectors.get(source);
if (connector != null && connector.getConnectorDefinition().getSourceClass() != null) {
// Source type is a valid built-in connector type.
- return connector.getClassLoader();
+ return connector.getConnectorFunctionPackage().getClassLoader();
} else {
return null;
}
@@ -741,18 +758,18 @@ private ClassLoader isBuiltInSink(String sinkType) throws IOException {
Connector connector = connectors.get(sink);
if (connector != null && connector.getConnectorDefinition().getSinkClass() != null) {
// Sink type is a valid built-in connector type
- return connector.getClassLoader();
+ return connector.getConnectorFunctionPackage().getClassLoader();
} else {
return null;
}
}
private TreeMap getFunctions() throws IOException {
- return FunctionUtils.searchForFunctions(functionsDir);
+ return FunctionUtils.searchForFunctions(functionsDir, narExtractionDirectory, true);
}
private TreeMap getConnectors() throws IOException {
- return ConnectorUtils.searchForConnectors(connectorsDir, narExtractionDirectory);
+ return ConnectorUtils.searchForConnectors(connectorsDir, narExtractionDirectory, true);
}
private SecretsProviderConfigurator getSecretsProviderConfigurator() {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
index 89281a2f550e2..e23838cb34396 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
@@ -38,6 +38,9 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
+import net.bytebuddy.description.type.TypeDefinition;
+import net.bytebuddy.dynamic.ClassFileLocator;
+import net.bytebuddy.pool.TypePool;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.functions.WindowConfig;
import org.apache.pulsar.common.nar.NarClassLoader;
@@ -325,7 +328,8 @@ public void close() {
}
private void inferringMissingTypeClassName(Function.FunctionDetails.Builder functionDetailsBuilder,
- ClassLoader classLoader) throws ClassNotFoundException {
+ ClassLoader classLoader) {
+ TypePool typePool = TypePool.Default.of(ClassFileLocator.ForClassLoader.of(classLoader));
switch (functionDetailsBuilder.getComponentType()) {
case FUNCTION:
if ((functionDetailsBuilder.hasSource()
@@ -344,14 +348,13 @@ private void inferringMissingTypeClassName(Function.FunctionDetails.Builder func
WindowConfig.class);
className = windowConfig.getActualWindowFunctionClassName();
}
-
- Class>[] typeArgs = FunctionCommon.getFunctionTypes(classLoader.loadClass(className),
+ TypeDefinition[] typeArgs = FunctionCommon.getFunctionTypes(typePool.describe(className).resolve(),
isWindowConfigPresent);
if (functionDetailsBuilder.hasSource()
&& functionDetailsBuilder.getSource().getTypeClassName().isEmpty()
&& typeArgs[0] != null) {
Function.SourceSpec.Builder sourceBuilder = functionDetailsBuilder.getSource().toBuilder();
- sourceBuilder.setTypeClassName(typeArgs[0].getName());
+ sourceBuilder.setTypeClassName(typeArgs[0].asErasure().getTypeName());
functionDetailsBuilder.setSource(sourceBuilder.build());
}
@@ -359,7 +362,7 @@ private void inferringMissingTypeClassName(Function.FunctionDetails.Builder func
&& functionDetailsBuilder.getSink().getTypeClassName().isEmpty()
&& typeArgs[1] != null) {
Function.SinkSpec.Builder sinkBuilder = functionDetailsBuilder.getSink().toBuilder();
- sinkBuilder.setTypeClassName(typeArgs[1].getName());
+ sinkBuilder.setTypeClassName(typeArgs[1].asErasure().getTypeName());
functionDetailsBuilder.setSink(sinkBuilder.build());
}
}
@@ -368,7 +371,8 @@ private void inferringMissingTypeClassName(Function.FunctionDetails.Builder func
if ((functionDetailsBuilder.hasSink()
&& functionDetailsBuilder.getSink().getTypeClassName().isEmpty())) {
String typeArg =
- getSinkType(functionDetailsBuilder.getSink().getClassName(), classLoader).getName();
+ getSinkType(functionDetailsBuilder.getSink().getClassName(), typePool).asErasure()
+ .getTypeName();
Function.SinkSpec.Builder sinkBuilder =
Function.SinkSpec.newBuilder(functionDetailsBuilder.getSink());
@@ -387,7 +391,8 @@ private void inferringMissingTypeClassName(Function.FunctionDetails.Builder func
if ((functionDetailsBuilder.hasSource()
&& functionDetailsBuilder.getSource().getTypeClassName().isEmpty())) {
String typeArg =
- getSourceType(functionDetailsBuilder.getSource().getClassName(), classLoader).getName();
+ getSourceType(functionDetailsBuilder.getSource().getClassName(), typePool).asErasure()
+ .getTypeName();
Function.SourceSpec.Builder sourceBuilder =
Function.SourceSpec.newBuilder(functionDetailsBuilder.getSource());
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
index ed128568bcf50..9dca4015d5ef5 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
@@ -124,17 +124,17 @@ private static ClassLoader getFunctionClassLoader(InstanceConfig instanceConfig,
if (componentType == Function.FunctionDetails.ComponentType.FUNCTION && functionsManager.isPresent()) {
return functionsManager.get()
.getFunction(instanceConfig.getFunctionDetails().getBuiltin())
- .getClassLoader();
+ .getFunctionPackage().getClassLoader();
}
if (componentType == Function.FunctionDetails.ComponentType.SOURCE && connectorsManager.isPresent()) {
return connectorsManager.get()
.getConnector(instanceConfig.getFunctionDetails().getSource().getBuiltin())
- .getClassLoader();
+ .getConnectorFunctionPackage().getClassLoader();
}
if (componentType == Function.FunctionDetails.ComponentType.SINK && connectorsManager.isPresent()) {
return connectorsManager.get()
.getConnector(instanceConfig.getFunctionDetails().getSink().getBuiltin())
- .getClassLoader();
+ .getConnectorFunctionPackage().getClassLoader();
}
}
return loadJars(jarFile, instanceConfig, functionId, instanceConfig.getFunctionDetails().getName(),
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
index e1770b8b64415..19d31d0f63b1d 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.functions.worker;
+import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
@@ -27,18 +28,35 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.io.ConfigFieldDefinition;
import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.utils.io.Connector;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
@Slf4j
-public class ConnectorsManager {
+public class ConnectorsManager implements AutoCloseable {
@Getter
private volatile TreeMap connectors;
+ @VisibleForTesting
+ public ConnectorsManager() {
+ this.connectors = new TreeMap<>();
+ }
+
public ConnectorsManager(WorkerConfig workerConfig) throws IOException {
- this.connectors = ConnectorUtils
- .searchForConnectors(workerConfig.getConnectorsDirectory(), workerConfig.getNarExtractionDirectory());
+ this.connectors = createConnectors(workerConfig);
+ }
+
+ private static TreeMap createConnectors(WorkerConfig workerConfig) throws IOException {
+ boolean enableClassloading = workerConfig.getEnableClassloadingOfBuiltinFiles()
+ || ThreadRuntimeFactory.class.getName().equals(workerConfig.getFunctionRuntimeFactoryClassName());
+ return ConnectorUtils.searchForConnectors(workerConfig.getConnectorsDirectory(),
+ workerConfig.getNarExtractionDirectory(), enableClassloading);
+ }
+
+ @VisibleForTesting
+ public void addConnector(String connectorType, Connector connector) {
+ connectors.put(connectorType, connector);
}
public Connector getConnector(String connectorType) {
@@ -71,7 +89,25 @@ public Path getSinkArchive(String sinkType) {
}
public void reloadConnectors(WorkerConfig workerConfig) throws IOException {
- connectors = ConnectorUtils
- .searchForConnectors(workerConfig.getConnectorsDirectory(), workerConfig.getNarExtractionDirectory());
+ TreeMap oldConnectors = connectors;
+ this.connectors = createConnectors(workerConfig);
+ closeConnectors(oldConnectors);
}
+
+ @Override
+ public void close() {
+ closeConnectors(connectors);
+ }
+
+ private void closeConnectors(TreeMap connectorMap) {
+ connectorMap.values().forEach(connector -> {
+ try {
+ connector.close();
+ } catch (Exception e) {
+ log.warn("Failed to close connector", e);
+ }
+ });
+ connectorMap.clear();
+ }
+
}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java
index 9199d568cad03..5ab7ff7221abb 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.functions.worker;
+import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
@@ -25,16 +26,25 @@
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.functions.FunctionDefinition;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.utils.functions.FunctionArchive;
import org.apache.pulsar.functions.utils.functions.FunctionUtils;
@Slf4j
-public class FunctionsManager {
-
+public class FunctionsManager implements AutoCloseable {
private TreeMap functions;
+ @VisibleForTesting
+ public FunctionsManager() {
+ this.functions = new TreeMap<>();
+ }
+
public FunctionsManager(WorkerConfig workerConfig) throws IOException {
- this.functions = FunctionUtils.searchForFunctions(workerConfig.getFunctionsDirectory());
+ this.functions = createFunctions(workerConfig);
+ }
+
+ public void addFunction(String functionType, FunctionArchive functionArchive) {
+ functions.put(functionType, functionArchive);
}
public FunctionArchive getFunction(String functionType) {
@@ -51,6 +61,32 @@ public List getFunctionDefinitions() {
}
public void reloadFunctions(WorkerConfig workerConfig) throws IOException {
- this.functions = FunctionUtils.searchForFunctions(workerConfig.getFunctionsDirectory());
+ TreeMap oldFunctions = functions;
+ this.functions = createFunctions(workerConfig);
+ closeFunctions(oldFunctions);
+ }
+
+ private static TreeMap createFunctions(WorkerConfig workerConfig) throws IOException {
+ boolean enableClassloading = workerConfig.getEnableClassloadingOfBuiltinFiles()
+ || ThreadRuntimeFactory.class.getName().equals(workerConfig.getFunctionRuntimeFactoryClassName());
+ return FunctionUtils.searchForFunctions(workerConfig.getFunctionsDirectory(),
+ workerConfig.getNarExtractionDirectory(),
+ enableClassloading);
+ }
+
+ @Override
+ public void close() {
+ closeFunctions(functions);
+ }
+
+ private void closeFunctions(TreeMap functionMap) {
+ functionMap.values().forEach(functionArchive -> {
+ try {
+ functionArchive.close();
+ } catch (Exception e) {
+ log.warn("Failed to close function archive", e);
+ }
+ });
+ functionMap.clear();
}
}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 0ed73953d7aa7..2d9698103fa0f 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -238,6 +238,22 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
)
private boolean zooKeeperAllowReadOnlyOperations;
+ @FieldContext(
+ category = CATEGORY_WORKER,
+ doc = "Specifies if the function worker should use classloading for validating submissions for built-in "
+ + "connectors and functions. This is required for validateConnectorConfig to take effect. "
+ + "Default is false."
+ )
+ private Boolean enableClassloadingOfBuiltinFiles = false;
+
+ @FieldContext(
+ category = CATEGORY_WORKER,
+ doc = "Specifies if the function worker should use classloading for validating submissions for external "
+ + "connectors and functions. This is required for validateConnectorConfig to take effect. "
+ + "Default is false."
+ )
+ private Boolean enableClassloadingOfExternalFiles = false;
+
@FieldContext(
category = CATEGORY_CONNECTORS,
doc = "The path to the location to locate builtin connectors"
@@ -250,7 +266,10 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
private String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR;
@FieldContext(
category = CATEGORY_CONNECTORS,
- doc = "Should we validate connector config during submission"
+ doc = "Enables extended validation for connector config with fine-grain annotation based validation "
+ + "during submission. Classloading with either enableClassloadingOfExternalFiles or "
+ + "enableClassloadingOfBuiltinFiles must be enabled on the worker for this to take effect. "
+ + "Default is false."
)
private Boolean validateConnectorConfig = false;
@FieldContext(
diff --git a/pulsar-functions/utils/pom.xml b/pulsar-functions/utils/pom.xml
index e09ca7349fcf8..fdc8ab64274f4 100644
--- a/pulsar-functions/utils/pom.xml
+++ b/pulsar-functions/utils/pom.xml
@@ -87,6 +87,17 @@
typetools
+
+ net.bytebuddy
+ byte-buddy
+
+
+
+ org.zeroturnaround
+ zt-zip
+ 1.17
+
+
${project.groupId}
pulsar-client-original
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
index 7df173da0f195..6a3d2f6ad7ddb 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
@@ -22,16 +22,9 @@
import com.google.protobuf.AbstractMessage.Builder;
import com.google.protobuf.MessageOrBuilder;
import com.google.protobuf.util.JsonFormat;
-import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
-import java.io.ObjectOutputStream;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.net.MalformedURLException;
import java.net.ServerSocket;
import java.net.URISyntaxException;
import java.net.URL;
@@ -41,10 +34,14 @@
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import net.jodah.typetools.TypeResolver;
+import net.bytebuddy.description.type.TypeDefinition;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.description.type.TypeList;
+import net.bytebuddy.pool.TypePool;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.MessageId;
@@ -54,16 +51,11 @@
import org.apache.pulsar.client.impl.auth.AuthenticationDataBasic;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Utils;
-import org.apache.pulsar.common.nar.NarClassLoader;
-import org.apache.pulsar.common.nar.NarClassLoaderBuilder;
-import org.apache.pulsar.common.util.ClassLoaderUtils;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.WindowFunction;
import org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType;
import org.apache.pulsar.functions.proto.Function.FunctionDetails.Runtime;
-import org.apache.pulsar.functions.utils.functions.FunctionUtils;
-import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import org.apache.pulsar.io.core.BatchSource;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.Source;
@@ -97,50 +89,74 @@ public static int findAvailablePort() {
}
}
- public static Class>[] getFunctionTypes(FunctionConfig functionConfig, ClassLoader classLoader)
+ public static TypeDefinition[] getFunctionTypes(FunctionConfig functionConfig, TypePool typePool)
throws ClassNotFoundException {
- return getFunctionTypes(functionConfig, classLoader.loadClass(functionConfig.getClassName()));
+ return getFunctionTypes(functionConfig, typePool.describe(functionConfig.getClassName()).resolve());
}
- public static Class>[] getFunctionTypes(FunctionConfig functionConfig, Class functionClass)
- throws ClassNotFoundException {
+ public static TypeDefinition[] getFunctionTypes(FunctionConfig functionConfig, TypeDefinition functionClass) {
boolean isWindowConfigPresent = functionConfig.getWindowConfig() != null;
return getFunctionTypes(functionClass, isWindowConfigPresent);
}
- public static Class>[] getFunctionTypes(Class> userClass, boolean isWindowConfigPresent) {
+ public static TypeDefinition[] getFunctionTypes(TypeDefinition userClass, boolean isWindowConfigPresent) {
Class> classParent = getFunctionClassParent(userClass, isWindowConfigPresent);
- Class>[] typeArgs = TypeResolver.resolveRawArguments(classParent, userClass);
+ TypeList.Generic typeArgsList = resolveInterfaceTypeArguments(userClass, classParent);
+ TypeDescription.Generic[] typeArgs = new TypeDescription.Generic[2];
+ typeArgs[0] = typeArgsList.get(0);
+ typeArgs[1] = typeArgsList.get(1);
// if window function
if (isWindowConfigPresent) {
if (classParent.equals(java.util.function.Function.class)) {
- if (!typeArgs[0].equals(Collection.class)) {
+ if (!typeArgs[0].asErasure().isAssignableTo(Collection.class)) {
throw new IllegalArgumentException("Window function must take a collection as input");
}
- typeArgs[0] = (Class>) unwrapType(classParent, userClass, 0);
+ typeArgs[0] = typeArgs[0].getTypeArguments().get(0);
}
}
- if (typeArgs[1].equals(Record.class)) {
- typeArgs[1] = (Class>) unwrapType(classParent, userClass, 1);
+ if (typeArgs[1].asErasure().isAssignableTo(Record.class)) {
+ typeArgs[1] = typeArgs[1].getTypeArguments().get(0);
+ }
+ if (typeArgs[1].asErasure().isAssignableTo(CompletableFuture.class)) {
+ typeArgs[1] = typeArgs[1].getTypeArguments().get(0);
}
-
return typeArgs;
}
- public static Class>[] getRawFunctionTypes(Class> userClass, boolean isWindowConfigPresent) {
+ private static TypeList.Generic resolveInterfaceTypeArguments(TypeDefinition userClass, Class> interfaceClass) {
+ if (!interfaceClass.isInterface()) {
+ throw new IllegalArgumentException("interfaceClass must be an interface");
+ }
+ for (TypeDescription.Generic interfaze : userClass.getInterfaces()) {
+ if (interfaze.asErasure().isAssignableTo(interfaceClass)) {
+ return interfaze.getTypeArguments();
+ }
+ }
+ if (userClass.getSuperClass() != null) {
+ return resolveInterfaceTypeArguments(userClass.getSuperClass(), interfaceClass);
+ }
+ return null;
+ }
+
+ public static TypeDescription.Generic[] getRawFunctionTypes(TypeDefinition userClass,
+ boolean isWindowConfigPresent) {
Class> classParent = getFunctionClassParent(userClass, isWindowConfigPresent);
- return TypeResolver.resolveRawArguments(classParent, userClass);
+ TypeList.Generic typeArgsList = resolveInterfaceTypeArguments(userClass, classParent);
+ TypeDescription.Generic[] typeArgs = new TypeDescription.Generic[2];
+ typeArgs[0] = typeArgsList.get(0);
+ typeArgs[1] = typeArgsList.get(1);
+ return typeArgs;
}
- public static Class> getFunctionClassParent(Class> userClass, boolean isWindowConfigPresent) {
+ public static Class> getFunctionClassParent(TypeDefinition userClass, boolean isWindowConfigPresent) {
if (isWindowConfigPresent) {
- if (WindowFunction.class.isAssignableFrom(userClass)) {
+ if (userClass.asErasure().isAssignableTo(WindowFunction.class)) {
return WindowFunction.class;
} else {
return java.util.function.Function.class;
}
} else {
- if (Function.class.isAssignableFrom(userClass)) {
+ if (userClass.asErasure().isAssignableTo(Function.class)) {
return Function.class;
} else {
return java.util.function.Function.class;
@@ -148,41 +164,6 @@ public static Class> getFunctionClassParent(Class> userClass, boolean isWind
}
}
- private static Type unwrapType(Class> type, Class> subType, int position) {
- Type genericType = TypeResolver.resolveGenericType(type, subType);
- Type argType = ((ParameterizedType) genericType).getActualTypeArguments()[position];
- return ((ParameterizedType) argType).getActualTypeArguments()[0];
- }
-
- public static Object createInstance(String userClassName, ClassLoader classLoader) {
- Class> theCls;
- try {
- theCls = Class.forName(userClassName);
- } catch (ClassNotFoundException | NoClassDefFoundError cnfe) {
- try {
- theCls = Class.forName(userClassName, true, classLoader);
- } catch (ClassNotFoundException | NoClassDefFoundError e) {
- throw new RuntimeException("User class must be in class path", cnfe);
- }
- }
- Object result;
- try {
- Constructor> meth = theCls.getDeclaredConstructor();
- meth.setAccessible(true);
- result = meth.newInstance();
- } catch (InstantiationException ie) {
- throw new RuntimeException("User class must be concrete", ie);
- } catch (NoSuchMethodException e) {
- throw new RuntimeException("User class doesn't have such method", e);
- } catch (IllegalAccessException e) {
- throw new RuntimeException("User class must have a no-arg constructor", e);
- } catch (InvocationTargetException e) {
- throw new RuntimeException("User class constructor throws exception", e);
- }
- return result;
-
- }
-
public static Runtime convertRuntime(FunctionConfig.Runtime runtime) {
for (Runtime type : Runtime.values()) {
if (type.name().equals(runtime.name())) {
@@ -223,29 +204,34 @@ public static FunctionConfig.ProcessingGuarantees convertProcessingGuarantee(
throw new RuntimeException("Unrecognized processing guarantee: " + processingGuarantees.name());
}
- public static Class> getSourceType(String className, ClassLoader classLoader) throws ClassNotFoundException {
- return getSourceType(classLoader.loadClass(className));
+ public static TypeDefinition getSourceType(String className, TypePool typePool) {
+ return getSourceType(typePool.describe(className).resolve());
}
- public static Class> getSourceType(Class sourceClass) {
-
- if (Source.class.isAssignableFrom(sourceClass)) {
- return TypeResolver.resolveRawArgument(Source.class, sourceClass);
- } else if (BatchSource.class.isAssignableFrom(sourceClass)) {
- return TypeResolver.resolveRawArgument(BatchSource.class, sourceClass);
+ public static TypeDefinition getSourceType(TypeDefinition sourceClass) {
+ if (sourceClass.asErasure().isAssignableTo(Source.class)) {
+ return resolveInterfaceTypeArguments(sourceClass, Source.class).get(0);
+ } else if (sourceClass.asErasure().isAssignableTo(BatchSource.class)) {
+ return resolveInterfaceTypeArguments(sourceClass, BatchSource.class).get(0);
} else {
throw new IllegalArgumentException(
String.format("Source class %s does not implement the correct interface",
- sourceClass.getName()));
+ sourceClass.getActualName()));
}
}
- public static Class> getSinkType(String className, ClassLoader classLoader) throws ClassNotFoundException {
- return getSinkType(classLoader.loadClass(className));
+ public static TypeDefinition getSinkType(String className, TypePool typePool) {
+ return getSinkType(typePool.describe(className).resolve());
}
- public static Class> getSinkType(Class sinkClass) {
- return TypeResolver.resolveRawArgument(Sink.class, sinkClass);
+ public static TypeDefinition getSinkType(TypeDefinition sinkClass) {
+ if (sinkClass.asErasure().isAssignableTo(Sink.class)) {
+ return resolveInterfaceTypeArguments(sinkClass, Sink.class).get(0);
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Sink class %s does not implement the correct interface",
+ sinkClass.getActualName()));
+ }
}
public static void downloadFromHttpUrl(String destPkgUrl, File targetFile) throws IOException {
@@ -264,16 +250,6 @@ public static void downloadFromHttpUrl(String destPkgUrl, File targetFile) throw
log.info("Downloading function package from {} to {} completed!", destPkgUrl, targetFile.getAbsoluteFile());
}
- public static ClassLoader extractClassLoader(String destPkgUrl) throws IOException, URISyntaxException {
- File file = extractFileFromPkgURL(destPkgUrl);
- try {
- return ClassLoaderUtils.loadJar(file);
- } catch (MalformedURLException e) {
- throw new IllegalArgumentException(
- "Corrupt User PackageFile " + file + " with error " + e.getMessage());
- }
- }
-
public static File createPkgTempFile() throws IOException {
return File.createTempFile("functions", ".tmp");
}
@@ -297,21 +273,6 @@ public static File extractFileFromPkgURL(String destPkgUrl) throws IOException,
}
}
- public static NarClassLoader extractNarClassLoader(File packageFile,
- String narExtractionDirectory) {
- if (packageFile != null) {
- try {
- return NarClassLoaderBuilder.builder()
- .narFile(packageFile)
- .extractionDirectory(narExtractionDirectory)
- .build();
- } catch (IOException e) {
- throw new IllegalArgumentException(e.getMessage());
- }
- }
- return null;
- }
-
public static String getFullyQualifiedInstanceId(org.apache.pulsar.functions.proto.Function.Instance instance) {
return getFullyQualifiedInstanceId(
instance.getFunctionMetaData().getFunctionDetails().getTenant(),
@@ -345,17 +306,6 @@ public static final MessageId getMessageId(long sequenceId) {
return new MessageIdImpl(ledgerId, entryId, -1);
}
- public static byte[] toByteArray(Object obj) throws IOException {
- byte[] bytes = null;
- try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
- ObjectOutputStream oos = new ObjectOutputStream(bos)) {
- oos.writeObject(obj);
- oos.flush();
- bytes = bos.toByteArray();
- }
- return bytes;
- }
-
public static String getUniquePackageName(String packageName) {
return String.format("%s-%s", UUID.randomUUID().toString(), packageName);
}
@@ -403,146 +353,11 @@ private static String extractFromFullyQualifiedName(String fqfn, int index) {
throw new RuntimeException("Invalid Fully Qualified Function Name " + fqfn);
}
- public static Class> getTypeArg(String className, Class> funClass, ClassLoader classLoader)
- throws ClassNotFoundException {
- Class> loadedClass = classLoader.loadClass(className);
- if (!funClass.isAssignableFrom(loadedClass)) {
- throw new IllegalArgumentException(
- String.format("class %s is not type of %s", className, funClass.getName()));
- }
- return TypeResolver.resolveRawArgument(funClass, loadedClass);
- }
-
public static double roundDecimal(double value, int places) {
double scale = Math.pow(10, places);
return Math.round(value * scale) / scale;
}
- public static ClassLoader getClassLoaderFromPackage(
- ComponentType componentType,
- String className,
- File packageFile,
- String narExtractionDirectory) {
- String connectorClassName = className;
- ClassLoader jarClassLoader = null;
- boolean keepJarClassLoader = false;
- ClassLoader narClassLoader = null;
- boolean keepNarClassLoader = false;
-
- Exception jarClassLoaderException = null;
- Exception narClassLoaderException = null;
-
- try {
- try {
- jarClassLoader = ClassLoaderUtils.extractClassLoader(packageFile);
- } catch (Exception e) {
- jarClassLoaderException = e;
- }
- try {
- narClassLoader = FunctionCommon.extractNarClassLoader(packageFile, narExtractionDirectory);
- } catch (Exception e) {
- narClassLoaderException = e;
- }
-
- // if connector class name is not provided, we can only try to load archive as a NAR
- if (isEmpty(connectorClassName)) {
- if (narClassLoader == null) {
- throw new IllegalArgumentException(String.format("%s package does not have the correct format. "
- + "Pulsar cannot determine if the package is a NAR package or JAR package. "
- + "%s classname is not provided and attempts to load it as a NAR package produced "
- + "the following error.",
- capFirstLetter(componentType), capFirstLetter(componentType)),
- narClassLoaderException);
- }
- try {
- if (componentType == ComponentType.FUNCTION) {
- connectorClassName = FunctionUtils.getFunctionClass(narClassLoader);
- } else if (componentType == ComponentType.SOURCE) {
- connectorClassName = ConnectorUtils.getIOSourceClass((NarClassLoader) narClassLoader);
- } else {
- connectorClassName = ConnectorUtils.getIOSinkClass((NarClassLoader) narClassLoader);
- }
- } catch (IOException e) {
- throw new IllegalArgumentException(String.format("Failed to extract %s class from archive",
- componentType.toString().toLowerCase()), e);
- }
-
- try {
- narClassLoader.loadClass(connectorClassName);
- keepNarClassLoader = true;
- return narClassLoader;
- } catch (ClassNotFoundException | NoClassDefFoundError e) {
- throw new IllegalArgumentException(
- String.format("%s class %s must be in class path", capFirstLetter(componentType),
- connectorClassName), e);
- }
-
- } else {
- // if connector class name is provided, we need to try to load it as a JAR and as a NAR.
- if (jarClassLoader != null) {
- try {
- jarClassLoader.loadClass(connectorClassName);
- keepJarClassLoader = true;
- return jarClassLoader;
- } catch (ClassNotFoundException | NoClassDefFoundError e) {
- // class not found in JAR try loading as a NAR and searching for the class
- if (narClassLoader != null) {
-
- try {
- narClassLoader.loadClass(connectorClassName);
- keepNarClassLoader = true;
- return narClassLoader;
- } catch (ClassNotFoundException | NoClassDefFoundError e1) {
- throw new IllegalArgumentException(
- String.format("%s class %s must be in class path",
- capFirstLetter(componentType), connectorClassName), e1);
- }
- } else {
- throw new IllegalArgumentException(
- String.format("%s class %s must be in class path", capFirstLetter(componentType),
- connectorClassName), e);
- }
- }
- } else if (narClassLoader != null) {
- try {
- narClassLoader.loadClass(connectorClassName);
- keepNarClassLoader = true;
- return narClassLoader;
- } catch (ClassNotFoundException | NoClassDefFoundError e1) {
- throw new IllegalArgumentException(
- String.format("%s class %s must be in class path",
- capFirstLetter(componentType), connectorClassName), e1);
- }
- } else {
- StringBuilder errorMsg = new StringBuilder(capFirstLetter(componentType)
- + " package does not have the correct format."
- + " Pulsar cannot determine if the package is a NAR package or JAR package.");
-
- if (jarClassLoaderException != null) {
- errorMsg.append(
- " Attempts to load it as a JAR package produced error: " + jarClassLoaderException
- .getMessage());
- }
-
- if (narClassLoaderException != null) {
- errorMsg.append(
- " Attempts to load it as a NAR package produced error: " + narClassLoaderException
- .getMessage());
- }
-
- throw new IllegalArgumentException(errorMsg.toString());
- }
- }
- } finally {
- if (!keepJarClassLoader) {
- ClassLoaderUtils.closeClassLoader(jarClassLoader);
- }
- if (!keepNarClassLoader) {
- ClassLoaderUtils.closeClassLoader(narClassLoader);
- }
- }
- }
-
public static String capFirstLetter(Enum en) {
return StringUtils.capitalize(en.toString().toLowerCase());
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index e4609672a3d0d..ee59317daf755 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -18,12 +18,11 @@
*/
package org.apache.pulsar.functions.utils;
-import static org.apache.commons.lang.StringUtils.isBlank;
-import static org.apache.commons.lang.StringUtils.isNotBlank;
-import static org.apache.commons.lang.StringUtils.isNotEmpty;
+import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.pulsar.common.functions.Utils.BUILTIN;
-import static org.apache.pulsar.common.util.ClassLoaderUtils.loadJar;
import static org.apache.pulsar.functions.utils.FunctionCommon.convertFromCompressionType;
import static org.apache.pulsar.functions.utils.FunctionCommon.convertFromFunctionDetailsCompressionType;
import static org.apache.pulsar.functions.utils.FunctionCommon.convertFromFunctionDetailsSubscriptionPosition;
@@ -32,9 +31,7 @@
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.io.File;
-import java.io.IOException;
import java.lang.reflect.Type;
-import java.net.MalformedURLException;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
@@ -44,10 +41,13 @@
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang.StringUtils;
+import net.bytebuddy.description.type.TypeDefinition;
+import net.bytebuddy.pool.TypePool;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.FunctionDefinition;
import org.apache.pulsar.common.functions.ProducerConfig;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.functions.WindowConfig;
@@ -55,7 +55,6 @@
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
-import org.apache.pulsar.functions.utils.functions.FunctionUtils;
@Slf4j
public class FunctionConfigUtils {
@@ -74,26 +73,21 @@ public static class ExtractedFunctionDetails {
private static final ObjectMapper OBJECT_MAPPER = ObjectMapperFactory.create();
- public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader classLoader)
- throws IllegalArgumentException {
+ public static FunctionDetails convert(FunctionConfig functionConfig) {
+ return convert(functionConfig, (ValidatableFunctionPackage) null);
+ }
- if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
- if (classLoader != null) {
- try {
- Class>[] typeArgs = FunctionCommon.getFunctionTypes(functionConfig, classLoader);
- return convert(
- functionConfig,
- new ExtractedFunctionDetails(
- functionConfig.getClassName(),
- typeArgs[0].getName(),
- typeArgs[1].getName()));
- } catch (ClassNotFoundException | NoClassDefFoundError e) {
- throw new IllegalArgumentException(
- String.format("Function class %s must be in class path", functionConfig.getClassName()), e);
- }
- }
+ public static FunctionDetails convert(FunctionConfig functionConfig,
+ ValidatableFunctionPackage validatableFunctionPackage)
+ throws IllegalArgumentException {
+ if (functionConfig == null) {
+ throw new IllegalArgumentException("Function config is not provided");
+ }
+ if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA && validatableFunctionPackage != null) {
+ return convert(functionConfig, doJavaChecks(functionConfig, validatableFunctionPackage));
+ } else {
+ return convert(functionConfig, new ExtractedFunctionDetails(functionConfig.getClassName(), null, null));
}
- return convert(functionConfig, new ExtractedFunctionDetails(functionConfig.getClassName(), null, null));
}
public static FunctionDetails convert(FunctionConfig functionConfig, ExtractedFunctionDetails extractedDetails)
@@ -593,48 +587,49 @@ public static void inferMissingArguments(FunctionConfig functionConfig,
}
}
- public static ExtractedFunctionDetails doJavaChecks(FunctionConfig functionConfig, ClassLoader clsLoader) {
+ public static ExtractedFunctionDetails doJavaChecks(FunctionConfig functionConfig,
+ ValidatableFunctionPackage validatableFunctionPackage) {
- String functionClassName = functionConfig.getClassName();
- Class functionClass;
+ String functionClassName = StringUtils.trimToNull(functionConfig.getClassName());
+ TypeDefinition functionClass;
try {
// if class name in function config is not set, this should be a built-in function
// thus we should try to find its class name in the NAR service definition
if (functionClassName == null) {
- try {
- functionClassName = FunctionUtils.getFunctionClass(clsLoader);
- } catch (IOException e) {
- throw new IllegalArgumentException("Failed to extract source class from archive", e);
+ FunctionDefinition functionDefinition =
+ validatableFunctionPackage.getFunctionMetaData(FunctionDefinition.class);
+ if (functionDefinition == null) {
+ throw new IllegalArgumentException("Function class name is not provided.");
+ }
+ functionClassName = functionDefinition.getFunctionClass();
+ if (functionClassName == null) {
+ throw new IllegalArgumentException("Function class name is not provided.");
}
}
- functionClass = clsLoader.loadClass(functionClassName);
+ functionClass = validatableFunctionPackage.resolveType(functionClassName);
- if (!org.apache.pulsar.functions.api.Function.class.isAssignableFrom(functionClass)
- && !java.util.function.Function.class.isAssignableFrom(functionClass)
- && !org.apache.pulsar.functions.api.WindowFunction.class.isAssignableFrom(functionClass)) {
+ if (!functionClass.asErasure().isAssignableTo(org.apache.pulsar.functions.api.Function.class)
+ && !functionClass.asErasure().isAssignableTo(java.util.function.Function.class)
+ && !functionClass.asErasure()
+ .isAssignableTo(org.apache.pulsar.functions.api.WindowFunction.class)) {
throw new IllegalArgumentException(
String.format("Function class %s does not implement the correct interface",
- functionClass.getName()));
+ functionClassName));
}
- } catch (ClassNotFoundException | NoClassDefFoundError e) {
+ } catch (TypePool.Resolution.NoSuchTypeException e) {
throw new IllegalArgumentException(
- String.format("Function class %s must be in class path", functionConfig.getClassName()), e);
+ String.format("Function class %s must be in class path", functionClassName), e);
}
- Class>[] typeArgs;
- try {
- typeArgs = FunctionCommon.getFunctionTypes(functionConfig, functionClass);
- } catch (ClassNotFoundException | NoClassDefFoundError e) {
- throw new IllegalArgumentException(
- String.format("Function class %s must be in class path", functionConfig.getClassName()), e);
- }
+ TypeDefinition[] typeArgs = FunctionCommon.getFunctionTypes(functionConfig, functionClass);
// inputs use default schema, so there is no check needed there
// Check if the Input serialization/deserialization class exists in jar or already loaded and that it
// implements SerDe class
if (functionConfig.getCustomSerdeInputs() != null) {
functionConfig.getCustomSerdeInputs().forEach((topicName, inputSerializer) -> {
- ValidatorUtils.validateSerde(inputSerializer, typeArgs[0], clsLoader, true);
+ ValidatorUtils.validateSerde(inputSerializer, typeArgs[0], validatableFunctionPackage.getTypePool(),
+ true);
});
}
@@ -649,8 +644,8 @@ public static ExtractedFunctionDetails doJavaChecks(FunctionConfig functionConfi
throw new IllegalArgumentException(
String.format("Topic %s has an incorrect schema Info", topicName));
}
- ValidatorUtils.validateSchema(consumerConfig.getSchemaType(), typeArgs[0], clsLoader, true);
-
+ ValidatorUtils.validateSchema(consumerConfig.getSchemaType(), typeArgs[0],
+ validatableFunctionPackage.getTypePool(), true);
});
}
@@ -665,13 +660,16 @@ public static ExtractedFunctionDetails doJavaChecks(FunctionConfig functionConfi
"Only one of schemaType or serdeClassName should be set in inputSpec");
}
if (!isEmpty(conf.getSerdeClassName())) {
- ValidatorUtils.validateSerde(conf.getSerdeClassName(), typeArgs[0], clsLoader, true);
+ ValidatorUtils.validateSerde(conf.getSerdeClassName(), typeArgs[0],
+ validatableFunctionPackage.getTypePool(), true);
}
if (!isEmpty(conf.getSchemaType())) {
- ValidatorUtils.validateSchema(conf.getSchemaType(), typeArgs[0], clsLoader, true);
+ ValidatorUtils.validateSchema(conf.getSchemaType(), typeArgs[0],
+ validatableFunctionPackage.getTypePool(), true);
}
if (conf.getCryptoConfig() != null) {
- ValidatorUtils.validateCryptoKeyReader(conf.getCryptoConfig(), clsLoader, false);
+ ValidatorUtils.validateCryptoKeyReader(conf.getCryptoConfig(),
+ validatableFunctionPackage.getTypePool(), false);
}
});
}
@@ -679,8 +677,8 @@ public static ExtractedFunctionDetails doJavaChecks(FunctionConfig functionConfi
if (Void.class.equals(typeArgs[1])) {
return new FunctionConfigUtils.ExtractedFunctionDetails(
functionClassName,
- typeArgs[0].getName(),
- typeArgs[1].getName());
+ typeArgs[0].asErasure().getTypeName(),
+ typeArgs[1].asErasure().getTypeName());
}
// One and only one of outputSchemaType and outputSerdeClassName should be set
@@ -690,22 +688,25 @@ public static ExtractedFunctionDetails doJavaChecks(FunctionConfig functionConfi
}
if (!isEmpty(functionConfig.getOutputSchemaType())) {
- ValidatorUtils.validateSchema(functionConfig.getOutputSchemaType(), typeArgs[1], clsLoader, false);
+ ValidatorUtils.validateSchema(functionConfig.getOutputSchemaType(), typeArgs[1],
+ validatableFunctionPackage.getTypePool(), false);
}
if (!isEmpty(functionConfig.getOutputSerdeClassName())) {
- ValidatorUtils.validateSerde(functionConfig.getOutputSerdeClassName(), typeArgs[1], clsLoader, false);
+ ValidatorUtils.validateSerde(functionConfig.getOutputSerdeClassName(), typeArgs[1],
+ validatableFunctionPackage.getTypePool(), false);
}
if (functionConfig.getProducerConfig() != null
&& functionConfig.getProducerConfig().getCryptoConfig() != null) {
ValidatorUtils
- .validateCryptoKeyReader(functionConfig.getProducerConfig().getCryptoConfig(), clsLoader, true);
+ .validateCryptoKeyReader(functionConfig.getProducerConfig().getCryptoConfig(),
+ validatableFunctionPackage.getTypePool(), true);
}
return new FunctionConfigUtils.ExtractedFunctionDetails(
functionClassName,
- typeArgs[0].getName(),
- typeArgs[1].getName());
+ typeArgs[0].asErasure().getTypeName(),
+ typeArgs[1].asErasure().getTypeName());
}
private static void doPythonChecks(FunctionConfig functionConfig) {
@@ -912,47 +913,21 @@ public static Collection collectAllInputTopics(FunctionConfig functionCo
return retval;
}
- public static ClassLoader validate(FunctionConfig functionConfig, File functionPackageFile) {
+ public static void validateNonJavaFunction(FunctionConfig functionConfig) {
doCommonChecks(functionConfig);
- if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
- ClassLoader classLoader;
- if (functionPackageFile != null) {
- try {
- classLoader = loadJar(functionPackageFile);
- } catch (MalformedURLException e) {
- throw new IllegalArgumentException("Corrupted Jar File", e);
- }
- } else if (!isEmpty(functionConfig.getJar())) {
- File jarFile = new File(functionConfig.getJar());
- if (!jarFile.exists()) {
- throw new IllegalArgumentException("Jar file does not exist");
- }
- try {
- classLoader = loadJar(jarFile);
- } catch (Exception e) {
- throw new IllegalArgumentException("Corrupted Jar File", e);
- }
- } else {
- throw new IllegalArgumentException("Function Package is not provided");
- }
-
- doJavaChecks(functionConfig, classLoader);
- return classLoader;
- } else if (functionConfig.getRuntime() == FunctionConfig.Runtime.GO) {
+ if (functionConfig.getRuntime() == FunctionConfig.Runtime.GO) {
doGolangChecks(functionConfig);
- return null;
} else if (functionConfig.getRuntime() == FunctionConfig.Runtime.PYTHON) {
doPythonChecks(functionConfig);
- return null;
} else {
throw new IllegalArgumentException("Function language runtime is either not set or cannot be determined");
}
}
public static ExtractedFunctionDetails validateJavaFunction(FunctionConfig functionConfig,
- ClassLoader classLoader) {
+ ValidatableFunctionPackage validatableFunctionPackage) {
doCommonChecks(functionConfig);
- return doJavaChecks(functionConfig, classLoader);
+ return doJavaChecks(functionConfig, validatableFunctionPackage);
}
public static FunctionConfig validateUpdate(FunctionConfig existingConfig, FunctionConfig newConfig) {
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionFilePackage.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionFilePackage.java
new file mode 100644
index 0000000000000..8224de32521fb
--- /dev/null
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionFilePackage.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.MalformedURLException;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.List;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.dynamic.ClassFileLocator;
+import net.bytebuddy.pool.TypePool;
+import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.nar.NarClassLoaderBuilder;
+import org.apache.pulsar.functions.utils.functions.FunctionUtils;
+import org.zeroturnaround.zip.ZipUtil;
+
+/**
+ * FunctionFilePackage is a class that represents a function package and
+ * implements the ValidatableFunctionPackage interface which decouples the
+ * function package from classloading.
+ */
+public class FunctionFilePackage implements AutoCloseable, ValidatableFunctionPackage {
+ private final File file;
+ private final ClassFileLocator.Compound classFileLocator;
+ private final TypePool typePool;
+ private final boolean isNar;
+ private final String narExtractionDirectory;
+ private final boolean enableClassloading;
+
+ private ClassLoader classLoader;
+
+ private final Object configMetadata;
+
+ public FunctionFilePackage(File file, String narExtractionDirectory, boolean enableClassloading,
+ Class> configClass) {
+ this.file = file;
+ boolean nonZeroFile = file.isFile() && file.length() > 0;
+ this.isNar = nonZeroFile ? ZipUtil.containsAnyEntry(file,
+ new String[] {"META-INF/services/pulsar-io.yaml", "META-INF/bundled-dependencies"}) : false;
+ this.narExtractionDirectory = narExtractionDirectory;
+ this.enableClassloading = enableClassloading;
+ if (isNar) {
+ List classpathFromArchive = null;
+ try {
+ classpathFromArchive = NarClassLoader.getClasspathFromArchive(file, narExtractionDirectory);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ List classFileLocators = new ArrayList<>();
+ classFileLocators.add(ClassFileLocator.ForClassLoader.ofSystemLoader());
+ for (File classpath : classpathFromArchive) {
+ if (classpath.exists()) {
+ try {
+ ClassFileLocator locator;
+ if (classpath.isDirectory()) {
+ locator = new ClassFileLocator.ForFolder(classpath);
+ } else {
+ locator = ClassFileLocator.ForJarFile.of(classpath);
+ }
+ classFileLocators.add(locator);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ }
+ this.classFileLocator = new ClassFileLocator.Compound(classFileLocators);
+ this.typePool = TypePool.Default.of(classFileLocator);
+ try {
+ this.configMetadata = FunctionUtils.getPulsarIOServiceConfig(file, configClass);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ } else {
+ try {
+ this.classFileLocator = nonZeroFile
+ ? new ClassFileLocator.Compound(ClassFileLocator.ForClassLoader.ofSystemLoader(),
+ ClassFileLocator.ForJarFile.of(file)) :
+ new ClassFileLocator.Compound(ClassFileLocator.ForClassLoader.ofSystemLoader());
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ this.typePool =
+ TypePool.Default.of(classFileLocator);
+ this.configMetadata = null;
+ }
+ }
+
+ public TypeDescription resolveType(String className) {
+ return typePool.describe(className).resolve();
+ }
+
+ public boolean isNar() {
+ return isNar;
+ }
+
+ public File getFile() {
+ return file;
+ }
+
+ public TypePool getTypePool() {
+ return typePool;
+ }
+
+ @Override
+ public T getFunctionMetaData(Class clazz) {
+ return configMetadata != null ? clazz.cast(configMetadata) : null;
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ classFileLocator.close();
+ if (classLoader instanceof Closeable) {
+ ((Closeable) classLoader).close();
+ }
+ }
+
+ public boolean isEnableClassloading() {
+ return enableClassloading;
+ }
+
+ public synchronized ClassLoader getClassLoader() {
+ if (classLoader == null) {
+ classLoader = createClassLoader();
+ }
+ return classLoader;
+ }
+
+ private ClassLoader createClassLoader() {
+ if (enableClassloading) {
+ if (isNar) {
+ try {
+ return NarClassLoaderBuilder.builder()
+ .narFile(file)
+ .extractionDirectory(narExtractionDirectory)
+ .build();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ } else {
+ try {
+ return new URLClassLoader(new java.net.URL[] {file.toURI().toURL()},
+ NarClassLoader.class.getClassLoader());
+ } catch (MalformedURLException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ } else {
+ throw new IllegalStateException("Classloading is not enabled");
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "FunctionFilePackage{"
+ + "file=" + file
+ + ", isNar=" + isNar
+ + '}';
+ }
+}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionRuntimeCommon.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionRuntimeCommon.java
new file mode 100644
index 0000000000000..ed17478dd00ed
--- /dev/null
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionRuntimeCommon.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils;
+
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+import java.io.File;
+import java.io.IOException;
+import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.nar.NarClassLoaderBuilder;
+import org.apache.pulsar.common.util.ClassLoaderUtils;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.utils.functions.FunctionUtils;
+import org.apache.pulsar.functions.utils.io.ConnectorUtils;
+
+public class FunctionRuntimeCommon {
+ public static NarClassLoader extractNarClassLoader(File packageFile,
+ String narExtractionDirectory) {
+ if (packageFile != null) {
+ try {
+ return NarClassLoaderBuilder.builder()
+ .narFile(packageFile)
+ .extractionDirectory(narExtractionDirectory)
+ .build();
+ } catch (IOException e) {
+ throw new IllegalArgumentException(e.getMessage());
+ }
+ }
+ return null;
+ }
+
+ public static ClassLoader getClassLoaderFromPackage(
+ Function.FunctionDetails.ComponentType componentType,
+ String className,
+ File packageFile,
+ String narExtractionDirectory) {
+ String connectorClassName = className;
+ ClassLoader jarClassLoader = null;
+ boolean keepJarClassLoader = false;
+ NarClassLoader narClassLoader = null;
+ boolean keepNarClassLoader = false;
+
+ Exception jarClassLoaderException = null;
+ Exception narClassLoaderException = null;
+
+ try {
+ try {
+ jarClassLoader = ClassLoaderUtils.extractClassLoader(packageFile);
+ } catch (Exception e) {
+ jarClassLoaderException = e;
+ }
+ try {
+ narClassLoader = extractNarClassLoader(packageFile, narExtractionDirectory);
+ } catch (Exception e) {
+ narClassLoaderException = e;
+ }
+
+ // if connector class name is not provided, we can only try to load archive as a NAR
+ if (isEmpty(connectorClassName)) {
+ if (narClassLoader == null) {
+ throw new IllegalArgumentException(String.format("%s package does not have the correct format. "
+ + "Pulsar cannot determine if the package is a NAR package or JAR package. "
+ + "%s classname is not provided and attempts to load it as a NAR package produced "
+ + "the following error.",
+ FunctionCommon.capFirstLetter(componentType), FunctionCommon.capFirstLetter(componentType)),
+ narClassLoaderException);
+ }
+ try {
+ if (componentType == Function.FunctionDetails.ComponentType.FUNCTION) {
+ connectorClassName = FunctionUtils.getFunctionClass(narClassLoader);
+ } else if (componentType == Function.FunctionDetails.ComponentType.SOURCE) {
+ connectorClassName = ConnectorUtils.getIOSourceClass(narClassLoader);
+ } else {
+ connectorClassName = ConnectorUtils.getIOSinkClass(narClassLoader);
+ }
+ } catch (IOException e) {
+ throw new IllegalArgumentException(String.format("Failed to extract %s class from archive",
+ componentType.toString().toLowerCase()), e);
+ }
+
+ try {
+ narClassLoader.loadClass(connectorClassName);
+ keepNarClassLoader = true;
+ return narClassLoader;
+ } catch (ClassNotFoundException | NoClassDefFoundError e) {
+ throw new IllegalArgumentException(String.format("%s class %s must be in class path",
+ FunctionCommon.capFirstLetter(componentType), connectorClassName), e);
+ }
+
+ } else {
+ // if connector class name is provided, we need to try to load it as a JAR and as a NAR.
+ if (jarClassLoader != null) {
+ try {
+ jarClassLoader.loadClass(connectorClassName);
+ keepJarClassLoader = true;
+ return jarClassLoader;
+ } catch (ClassNotFoundException | NoClassDefFoundError e) {
+ // class not found in JAR try loading as a NAR and searching for the class
+ if (narClassLoader != null) {
+
+ try {
+ narClassLoader.loadClass(connectorClassName);
+ keepNarClassLoader = true;
+ return narClassLoader;
+ } catch (ClassNotFoundException | NoClassDefFoundError e1) {
+ throw new IllegalArgumentException(
+ String.format("%s class %s must be in class path",
+ FunctionCommon.capFirstLetter(componentType), connectorClassName), e1);
+ }
+ } else {
+ throw new IllegalArgumentException(String.format("%s class %s must be in class path",
+ FunctionCommon.capFirstLetter(componentType), connectorClassName), e);
+ }
+ }
+ } else if (narClassLoader != null) {
+ try {
+ narClassLoader.loadClass(connectorClassName);
+ keepNarClassLoader = true;
+ return narClassLoader;
+ } catch (ClassNotFoundException | NoClassDefFoundError e1) {
+ throw new IllegalArgumentException(
+ String.format("%s class %s must be in class path",
+ FunctionCommon.capFirstLetter(componentType), connectorClassName), e1);
+ }
+ } else {
+ StringBuilder errorMsg = new StringBuilder(FunctionCommon.capFirstLetter(componentType)
+ + " package does not have the correct format."
+ + " Pulsar cannot determine if the package is a NAR package or JAR package.");
+
+ if (jarClassLoaderException != null) {
+ errorMsg.append(
+ " Attempts to load it as a JAR package produced error: " + jarClassLoaderException
+ .getMessage());
+ }
+
+ if (narClassLoaderException != null) {
+ errorMsg.append(
+ " Attempts to load it as a NAR package produced error: " + narClassLoaderException
+ .getMessage());
+ }
+
+ throw new IllegalArgumentException(errorMsg.toString());
+ }
+ }
+ } finally {
+ if (!keepJarClassLoader) {
+ ClassLoaderUtils.closeClassLoader(jarClassLoader);
+ }
+ if (!keepNarClassLoader) {
+ ClassLoaderUtils.closeClassLoader(narClassLoader);
+ }
+ }
+ }
+
+}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/LoadedFunctionPackage.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/LoadedFunctionPackage.java
new file mode 100644
index 0000000000000..e27ed0eca1973
--- /dev/null
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/LoadedFunctionPackage.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.dynamic.ClassFileLocator;
+import net.bytebuddy.pool.TypePool;
+import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.functions.utils.functions.FunctionUtils;
+
+/**
+ * LoadedFunctionPackage is a class that represents a function package and
+ * implements the ValidatableFunctionPackage interface which decouples the
+ * function package from classloading. This implementation is backed by
+ * a ClassLoader, and it is used when the function package is already loaded
+ * by a ClassLoader. This is the case in the LocalRunner and in some of
+ * the unit tests.
+ */
+public class LoadedFunctionPackage implements ValidatableFunctionPackage {
+ private final ClassLoader classLoader;
+ private final Object configMetadata;
+ private final TypePool typePool;
+
+ public LoadedFunctionPackage(ClassLoader classLoader, Class configMetadataClass, T configMetadata) {
+ this.classLoader = classLoader;
+ this.configMetadata = configMetadata;
+ typePool = TypePool.Default.of(
+ ClassFileLocator.ForClassLoader.of(classLoader));
+ }
+
+ public LoadedFunctionPackage(ClassLoader classLoader, Class> configMetadataClass) {
+ this.classLoader = classLoader;
+ if (classLoader instanceof NarClassLoader) {
+ try {
+ configMetadata = FunctionUtils.getPulsarIOServiceConfig((NarClassLoader) classLoader,
+ configMetadataClass);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ } else {
+ configMetadata = null;
+ }
+ typePool = TypePool.Default.of(
+ ClassFileLocator.ForClassLoader.of(classLoader));
+ }
+
+ @Override
+ public TypeDescription resolveType(String className) {
+ return typePool.describe(className).resolve();
+ }
+
+ @Override
+ public TypePool getTypePool() {
+ return typePool;
+ }
+
+ @Override
+ public T getFunctionMetaData(Class clazz) {
+ return configMetadata != null ? clazz.cast(configMetadata) : null;
+ }
+
+ @Override
+ public boolean isEnableClassloading() {
+ return true;
+ }
+
+ @Override
+ public ClassLoader getClassLoader() {
+ return classLoader;
+ }
+}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index 7919d69712600..d93676a106d9a 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -41,23 +41,23 @@
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
+import net.bytebuddy.description.type.TypeDefinition;
+import net.bytebuddy.pool.TypePool;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.FunctionDefinition;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.config.validation.ConfigValidation;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
-import org.apache.pulsar.functions.utils.functions.FunctionUtils;
-import org.apache.pulsar.functions.utils.io.ConnectorUtils;
@Slf4j
public class SinkConfigUtils {
@@ -402,8 +402,8 @@ public static SinkConfig convertFromDetails(FunctionDetails functionDetails) {
}
public static ExtractedSinkDetails validateAndExtractDetails(SinkConfig sinkConfig,
- ClassLoader sinkClassLoader,
- ClassLoader functionClassLoader,
+ ValidatableFunctionPackage sinkFunction,
+ ValidatableFunctionPackage transformFunction,
boolean validateConnectorConfig) {
if (isEmpty(sinkConfig.getTenant())) {
throw new IllegalArgumentException("Sink tenant cannot be null");
@@ -443,63 +443,72 @@ public static ExtractedSinkDetails validateAndExtractDetails(SinkConfig sinkConf
// if class name in sink config is not set, this should be a built-in sink
// thus we should try to find it class name in the NAR service definition
if (sinkClassName == null) {
- try {
- sinkClassName = ConnectorUtils.getIOSinkClass((NarClassLoader) sinkClassLoader);
- } catch (IOException e) {
- throw new IllegalArgumentException("Failed to extract sink class from archive", e);
+ ConnectorDefinition connectorDefinition = sinkFunction.getFunctionMetaData(ConnectorDefinition.class);
+ if (connectorDefinition == null) {
+ throw new IllegalArgumentException(
+ "Sink package doesn't contain the META-INF/services/pulsar-io.yaml file.");
+ }
+ sinkClassName = connectorDefinition.getSinkClass();
+ if (sinkClassName == null) {
+ throw new IllegalArgumentException("Failed to extract sink class from archive");
}
}
// check if sink implements the correct interfaces
- Class sinkClass;
+ TypeDefinition sinkClass;
try {
- sinkClass = sinkClassLoader.loadClass(sinkClassName);
- } catch (ClassNotFoundException e) {
+ sinkClass = sinkFunction.resolveType(sinkClassName);
+ } catch (TypePool.Resolution.NoSuchTypeException e) {
throw new IllegalArgumentException(
- String.format("Sink class %s not found in class loader", sinkClassName), e);
+ String.format("Sink class %s not found", sinkClassName), e);
}
String functionClassName = sinkConfig.getTransformFunctionClassName();
- Class> typeArg;
- ClassLoader inputClassLoader;
- if (functionClassLoader != null) {
+ TypeDefinition typeArg;
+ ValidatableFunctionPackage inputFunction;
+ if (transformFunction != null) {
// if function class name in sink config is not set, this should be a built-in function
// thus we should try to find it class name in the NAR service definition
if (functionClassName == null) {
- try {
- functionClassName = FunctionUtils.getFunctionClass(functionClassLoader);
- } catch (IOException e) {
- throw new IllegalArgumentException("Failed to extract function class from archive", e);
+ FunctionDefinition functionDefinition =
+ transformFunction.getFunctionMetaData(FunctionDefinition.class);
+ if (functionDefinition == null) {
+ throw new IllegalArgumentException(
+ "Function package doesn't contain the META-INF/services/pulsar-io.yaml file.");
+ }
+ functionClassName = functionDefinition.getFunctionClass();
+ if (functionClassName == null) {
+ throw new IllegalArgumentException("Transform function class name must be set");
}
}
- Class functionClass;
+ TypeDefinition functionClass;
try {
- functionClass = functionClassLoader.loadClass(functionClassName);
- } catch (ClassNotFoundException e) {
+ functionClass = transformFunction.resolveType(functionClassName);
+ } catch (TypePool.Resolution.NoSuchTypeException e) {
throw new IllegalArgumentException(
- String.format("Function class %s not found in class loader", functionClassName), e);
+ String.format("Function class %s not found", functionClassName), e);
}
// extract type from transform function class
- if (!getRawFunctionTypes(functionClass, false)[1].equals(Record.class)) {
+ if (!getRawFunctionTypes(functionClass, false)[1].asErasure().isAssignableTo(Record.class)) {
throw new IllegalArgumentException("Sink transform function output must be of type Record");
}
typeArg = getFunctionTypes(functionClass, false)[0];
- inputClassLoader = functionClassLoader;
+ inputFunction = transformFunction;
} else {
// extract type from sink class
typeArg = getSinkType(sinkClass);
- inputClassLoader = sinkClassLoader;
+ inputFunction = sinkFunction;
}
if (sinkConfig.getTopicToSerdeClassName() != null) {
for (String serdeClassName : sinkConfig.getTopicToSerdeClassName().values()) {
- ValidatorUtils.validateSerde(serdeClassName, typeArg, inputClassLoader, true);
+ ValidatorUtils.validateSerde(serdeClassName, typeArg, inputFunction.getTypePool(), true);
}
}
if (sinkConfig.getTopicToSchemaType() != null) {
for (String schemaType : sinkConfig.getTopicToSchemaType().values()) {
- ValidatorUtils.validateSchema(schemaType, typeArg, inputClassLoader, true);
+ ValidatorUtils.validateSchema(schemaType, typeArg, inputFunction.getTypePool(), true);
}
}
@@ -512,23 +521,43 @@ public static ExtractedSinkDetails validateAndExtractDetails(SinkConfig sinkConf
throw new IllegalArgumentException("Only one of serdeClassName or schemaType should be set");
}
if (!isEmpty(consumerSpec.getSerdeClassName())) {
- ValidatorUtils.validateSerde(consumerSpec.getSerdeClassName(), typeArg, inputClassLoader, true);
+ ValidatorUtils.validateSerde(consumerSpec.getSerdeClassName(), typeArg,
+ inputFunction.getTypePool(), true);
}
if (!isEmpty(consumerSpec.getSchemaType())) {
- ValidatorUtils.validateSchema(consumerSpec.getSchemaType(), typeArg, inputClassLoader, true);
+ ValidatorUtils.validateSchema(consumerSpec.getSchemaType(), typeArg,
+ inputFunction.getTypePool(), true);
}
if (consumerSpec.getCryptoConfig() != null) {
- ValidatorUtils.validateCryptoKeyReader(consumerSpec.getCryptoConfig(), inputClassLoader, false);
+ ValidatorUtils.validateCryptoKeyReader(consumerSpec.getCryptoConfig(),
+ inputFunction.getTypePool(), false);
}
}
}
- // validate user defined config if enabled and sink is loaded from NAR
- if (validateConnectorConfig && sinkClassLoader instanceof NarClassLoader) {
- validateSinkConfig(sinkConfig, (NarClassLoader) sinkClassLoader);
+ if (sinkConfig.getRetainKeyOrdering() != null
+ && sinkConfig.getRetainKeyOrdering()
+ && sinkConfig.getProcessingGuarantees() != null
+ && sinkConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+ throw new IllegalArgumentException(
+ "When effectively once processing guarantee is specified, retain Key ordering cannot be set");
+ }
+
+ if (sinkConfig.getRetainKeyOrdering() != null && sinkConfig.getRetainKeyOrdering()
+ && sinkConfig.getRetainOrdering() != null && sinkConfig.getRetainOrdering()) {
+ throw new IllegalArgumentException("Only one of retain ordering or retain key ordering can be set");
}
- return new ExtractedSinkDetails(sinkClassName, typeArg.getName(), functionClassName);
+ // validate user defined config if enabled and classloading is enabled
+ if (validateConnectorConfig) {
+ if (sinkFunction.isEnableClassloading()) {
+ validateSinkConfig(sinkConfig, sinkFunction);
+ } else {
+ log.warn("Skipping annotation based validation of sink config as classloading is disabled");
+ }
+ }
+
+ return new ExtractedSinkDetails(sinkClassName, typeArg.asErasure().getTypeName(), functionClassName);
}
public static Collection collectAllInputTopics(SinkConfig sinkConfig) {
@@ -684,29 +713,13 @@ public static SinkConfig validateUpdate(SinkConfig existingConfig, SinkConfig ne
return mergedConfig;
}
- public static void validateSinkConfig(SinkConfig sinkConfig, NarClassLoader narClassLoader) {
-
- if (sinkConfig.getRetainKeyOrdering() != null
- && sinkConfig.getRetainKeyOrdering()
- && sinkConfig.getProcessingGuarantees() != null
- && sinkConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
- throw new IllegalArgumentException(
- "When effectively once processing guarantee is specified, retain Key ordering cannot be set");
- }
-
- if (sinkConfig.getRetainKeyOrdering() != null && sinkConfig.getRetainKeyOrdering()
- && sinkConfig.getRetainOrdering() != null && sinkConfig.getRetainOrdering()) {
- throw new IllegalArgumentException("Only one of retain ordering or retain key ordering can be set");
- }
-
+ public static void validateSinkConfig(SinkConfig sinkConfig, ValidatableFunctionPackage sinkFunction) {
try {
- ConnectorDefinition defn = ConnectorUtils.getConnectorDefinition(narClassLoader);
- if (defn.getSinkConfigClass() != null) {
- Class configClass = Class.forName(defn.getSinkConfigClass(), true, narClassLoader);
+ ConnectorDefinition defn = sinkFunction.getFunctionMetaData(ConnectorDefinition.class);
+ if (defn != null && defn.getSinkConfigClass() != null) {
+ Class configClass = Class.forName(defn.getSinkConfigClass(), true, sinkFunction.getClassLoader());
validateSinkConfig(sinkConfig, configClass);
}
- } catch (IOException e) {
- throw new IllegalArgumentException("Error validating sink config", e);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("Could not find sink config class", e);
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index f3be015d73754..a6430bbea4585 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -35,7 +35,9 @@
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
-import net.jodah.typetools.TypeResolver;
+import net.bytebuddy.description.type.TypeDefinition;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.pool.TypePool;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.functions.ProducerConfig;
import org.apache.pulsar.common.functions.Resources;
@@ -44,13 +46,11 @@
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.config.validation.ConfigValidation;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
-import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import org.apache.pulsar.io.core.BatchSource;
import org.apache.pulsar.io.core.Source;
@@ -294,7 +294,7 @@ public static SourceConfig convertFromDetails(FunctionDetails functionDetails) {
}
public static ExtractedSourceDetails validateAndExtractDetails(SourceConfig sourceConfig,
- ClassLoader sourceClassLoader,
+ ValidatableFunctionPackage sourceFunction,
boolean validateConnectorConfig) {
if (isEmpty(sourceConfig.getTenant())) {
throw new IllegalArgumentException("Source tenant cannot be null");
@@ -319,29 +319,34 @@ public static ExtractedSourceDetails validateAndExtractDetails(SourceConfig sour
// if class name in source config is not set, this should be a built-in source
// thus we should try to find it class name in the NAR service definition
if (sourceClassName == null) {
- try {
- sourceClassName = ConnectorUtils.getIOSourceClass((NarClassLoader) sourceClassLoader);
- } catch (IOException e) {
- throw new IllegalArgumentException("Failed to extract source class from archive", e);
+ ConnectorDefinition connectorDefinition = sourceFunction.getFunctionMetaData(ConnectorDefinition.class);
+ if (connectorDefinition == null) {
+ throw new IllegalArgumentException(
+ "Source package doesn't contain the META-INF/services/pulsar-io.yaml file.");
+ }
+ sourceClassName = connectorDefinition.getSourceClass();
+ if (sourceClassName == null) {
+ throw new IllegalArgumentException("Failed to extract source class from archive");
}
}
// check if source implements the correct interfaces
- Class sourceClass;
+ TypeDescription sourceClass;
try {
- sourceClass = sourceClassLoader.loadClass(sourceClassName);
- } catch (ClassNotFoundException e) {
+ sourceClass = sourceFunction.resolveType(sourceClassName);
+ } catch (TypePool.Resolution.NoSuchTypeException e) {
throw new IllegalArgumentException(
String.format("Source class %s not found in class loader", sourceClassName), e);
}
- if (!Source.class.isAssignableFrom(sourceClass) && !BatchSource.class.isAssignableFrom(sourceClass)) {
+ if (!(sourceClass.asErasure().isAssignableTo(Source.class) || sourceClass.asErasure()
+ .isAssignableTo(BatchSource.class))) {
throw new IllegalArgumentException(
- String.format("Source class %s does not implement the correct interface",
- sourceClass.getName()));
+ String.format("Source class %s does not implement the correct interface",
+ sourceClass.getName()));
}
- if (BatchSource.class.isAssignableFrom(sourceClass)) {
+ if (sourceClass.asErasure().isAssignableTo(BatchSource.class)) {
if (sourceConfig.getBatchSourceConfig() != null) {
validateBatchSourceConfig(sourceConfig.getBatchSourceConfig());
} else {
@@ -352,7 +357,14 @@ public static ExtractedSourceDetails validateAndExtractDetails(SourceConfig sour
}
// extract type from source class
- Class> typeArg = getSourceType(sourceClass);
+ TypeDefinition typeArg;
+
+ try {
+ typeArg = getSourceType(sourceClass);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ String.format("Failed to resolve type for Source class %s", sourceClassName), e);
+ }
// Only one of serdeClassName or schemaType should be set
if (!StringUtils.isEmpty(sourceConfig.getSerdeClassName()) && !StringUtils
@@ -361,29 +373,30 @@ public static ExtractedSourceDetails validateAndExtractDetails(SourceConfig sour
}
if (!StringUtils.isEmpty(sourceConfig.getSerdeClassName())) {
- ValidatorUtils.validateSerde(sourceConfig.getSerdeClassName(), typeArg, sourceClassLoader, false);
+ ValidatorUtils.validateSerde(sourceConfig.getSerdeClassName(), typeArg, sourceFunction.getTypePool(),
+ false);
}
if (!StringUtils.isEmpty(sourceConfig.getSchemaType())) {
- ValidatorUtils.validateSchema(sourceConfig.getSchemaType(), typeArg, sourceClassLoader, false);
+ ValidatorUtils.validateSchema(sourceConfig.getSchemaType(), typeArg, sourceFunction.getTypePool(),
+ false);
}
if (sourceConfig.getProducerConfig() != null && sourceConfig.getProducerConfig().getCryptoConfig() != null) {
ValidatorUtils
- .validateCryptoKeyReader(sourceConfig.getProducerConfig().getCryptoConfig(), sourceClassLoader,
- true);
+ .validateCryptoKeyReader(sourceConfig.getProducerConfig().getCryptoConfig(),
+ sourceFunction.getTypePool(), true);
}
- if (typeArg.equals(TypeResolver.Unknown.class)) {
- throw new IllegalArgumentException(
- String.format("Failed to resolve type for Source class %s", sourceClassName));
- }
-
- // validate user defined config if enabled and source is loaded from NAR
- if (validateConnectorConfig && sourceClassLoader instanceof NarClassLoader) {
- validateSourceConfig(sourceConfig, (NarClassLoader) sourceClassLoader);
+ // validate user defined config if enabled and classloading is enabled
+ if (validateConnectorConfig) {
+ if (sourceFunction.isEnableClassloading()) {
+ validateSourceConfig(sourceConfig, sourceFunction);
+ } else {
+ log.warn("Skipping annotation based validation of sink config as classloading is disabled");
+ }
}
- return new ExtractedSourceDetails(sourceClassName, typeArg.getName());
+ return new ExtractedSourceDetails(sourceClassName, typeArg.asErasure().getTypeName());
}
@SneakyThrows
@@ -524,15 +537,14 @@ public static void validateBatchSourceConfigUpdate(BatchSourceConfig existingCon
}
}
- public static void validateSourceConfig(SourceConfig sourceConfig, NarClassLoader narClassLoader) {
+ public static void validateSourceConfig(SourceConfig sourceConfig, ValidatableFunctionPackage sourceFunction) {
try {
- ConnectorDefinition defn = ConnectorUtils.getConnectorDefinition(narClassLoader);
- if (defn.getSourceConfigClass() != null) {
- Class configClass = Class.forName(defn.getSourceConfigClass(), true, narClassLoader);
+ ConnectorDefinition defn = sourceFunction.getFunctionMetaData(ConnectorDefinition.class);
+ if (defn != null && defn.getSourceConfigClass() != null) {
+ Class configClass =
+ Class.forName(defn.getSourceConfigClass(), true, sourceFunction.getClassLoader());
validateSourceConfig(sourceConfig, configClass);
}
- } catch (IOException e) {
- throw new IllegalArgumentException("Error validating source config", e);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("Could not find source config class");
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ValidatableFunctionPackage.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ValidatableFunctionPackage.java
new file mode 100644
index 0000000000000..8d5aefb6f6785
--- /dev/null
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ValidatableFunctionPackage.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils;
+
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.pool.TypePool;
+
+/**
+ * This abstraction separates the function and connector definition from classloading,
+ * enabling validation without the need for classloading. It utilizes Byte Buddy for
+ * type and annotation resolution.
+ *
+ * The function or connector definition is directly extracted from the archive file,
+ * eliminating the need for classloader initialization.
+ *
+ * The getClassLoader method should only be invoked when classloading is enabled.
+ * Classloading is required in the LocalRunner and in the Functions worker when the
+ * worker is configured with the 'validateConnectorConfig' set to true.
+ */
+public interface ValidatableFunctionPackage {
+ /**
+ * Resolves the type description for the given class name within the function package.
+ */
+ TypeDescription resolveType(String className);
+ /**
+ * Returns the Byte Buddy TypePool instance for the function package.
+ */
+ TypePool getTypePool();
+ /**
+ * Returns the function or connector definition metadata.
+ * Supports FunctionDefinition and ConnectorDefinition as the metadata type.
+ */
+ T getFunctionMetaData(Class clazz);
+ /**
+ * Returns if classloading is enabled for the function package.
+ */
+ boolean isEnableClassloading();
+ /**
+ * Returns the classloader for the function package. The classloader is
+ * lazily initialized when classloading is enabled.
+ */
+ ClassLoader getClassLoader();
+}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ValidatorUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ValidatorUtils.java
index 390671c5606af..8df6a3f261a6e 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ValidatorUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ValidatorUtils.java
@@ -18,35 +18,40 @@
*/
package org.apache.pulsar.functions.utils;
-import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isEmpty;
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
-import net.jodah.typetools.TypeResolver;
+import net.bytebuddy.description.type.TypeDefinition;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.pool.TypePool;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.functions.CryptoConfig;
import org.apache.pulsar.common.schema.SchemaType;
-import org.apache.pulsar.common.util.ClassLoaderUtils;
-import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.api.SerDe;
-import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.io.core.Sink;
-import org.apache.pulsar.io.core.Source;
@Slf4j
public class ValidatorUtils {
private static final String DEFAULT_SERDE = "org.apache.pulsar.functions.api.utils.DefaultSerDe";
- public static void validateSchema(String schemaType, Class> typeArg, ClassLoader clsLoader,
+ public static void validateSchema(String schemaType, TypeDefinition typeArg, TypePool typePool,
boolean input) {
if (isEmpty(schemaType) || getBuiltinSchemaType(schemaType) != null) {
// If it's empty, we use the default schema and no need to validate
// If it's built-in, no need to validate
} else {
- ClassLoaderUtils.implementsClass(schemaType, Schema.class, clsLoader);
- validateSchemaType(schemaType, typeArg, clsLoader, input);
+ TypeDescription schemaClass = null;
+ try {
+ schemaClass = typePool.describe(schemaType).resolve();
+ } catch (TypePool.Resolution.NoSuchTypeException e) {
+ throw new IllegalArgumentException(
+ String.format("The schema class %s does not exist", schemaType));
+ }
+ if (!schemaClass.asErasure().isAssignableTo(Schema.class)) {
+ throw new IllegalArgumentException(
+ String.format("%s does not implement %s", schemaType, Schema.class.getName()));
+ }
+ validateSchemaType(schemaClass, typeArg, typePool, input);
}
}
@@ -60,29 +65,32 @@ private static SchemaType getBuiltinSchemaType(String schemaTypeOrClassName) {
}
- public static void validateCryptoKeyReader(CryptoConfig conf, ClassLoader classLoader, boolean isProducer) {
+ public static void validateCryptoKeyReader(CryptoConfig conf, TypePool typePool, boolean isProducer) {
if (isEmpty(conf.getCryptoKeyReaderClassName())) {
return;
}
- Class> cryptoClass;
+ String cryptoClassName = conf.getCryptoKeyReaderClassName();
+ TypeDescription cryptoClass = null;
try {
- cryptoClass = ClassLoaderUtils.loadClass(conf.getCryptoKeyReaderClassName(), classLoader);
- } catch (ClassNotFoundException | NoClassDefFoundError e) {
+ cryptoClass = typePool.describe(cryptoClassName).resolve();
+ } catch (TypePool.Resolution.NoSuchTypeException e) {
throw new IllegalArgumentException(
- String.format("The crypto key reader class %s does not exist", conf.getCryptoKeyReaderClassName()));
+ String.format("The crypto key reader class %s does not exist", cryptoClassName));
+ }
+ if (!cryptoClass.asErasure().isAssignableTo(CryptoKeyReader.class)) {
+ throw new IllegalArgumentException(
+ String.format("%s does not implement %s", cryptoClassName, CryptoKeyReader.class.getName()));
}
- ClassLoaderUtils.implementsClass(conf.getCryptoKeyReaderClassName(), CryptoKeyReader.class, classLoader);
- try {
- cryptoClass.getConstructor(Map.class);
- } catch (NoSuchMethodException ex) {
+ boolean hasConstructor = cryptoClass.getDeclaredMethods().stream()
+ .anyMatch(method -> method.isConstructor() && method.getParameters().size() == 1
+ && method.getParameters().get(0).getType().asErasure().represents(Map.class));
+
+ if (!hasConstructor) {
throw new IllegalArgumentException(
String.format("The crypto key reader class %s does not implement the desired constructor.",
conf.getCryptoKeyReaderClassName()));
-
- } catch (SecurityException e) {
- throw new IllegalArgumentException("Failed to access crypto key reader class", e);
}
if (isProducer && (conf.getEncryptionKeys() == null || conf.getEncryptionKeys().length == 0)) {
@@ -90,7 +98,7 @@ public static void validateCryptoKeyReader(CryptoConfig conf, ClassLoader classL
}
}
- public static void validateSerde(String inputSerializer, Class> typeArg, ClassLoader clsLoader,
+ public static void validateSerde(String inputSerializer, TypeDefinition typeArg, TypePool typePool,
boolean deser) {
if (isEmpty(inputSerializer)) {
return;
@@ -98,154 +106,53 @@ public static void validateSerde(String inputSerializer, Class> typeArg, Class
if (inputSerializer.equals(DEFAULT_SERDE)) {
return;
}
+ TypeDescription serdeClass;
try {
- Class> serdeClass = ClassLoaderUtils.loadClass(inputSerializer, clsLoader);
- } catch (ClassNotFoundException | NoClassDefFoundError e) {
+ serdeClass = typePool.describe(inputSerializer).resolve();
+ } catch (TypePool.Resolution.NoSuchTypeException e) {
throw new IllegalArgumentException(
String.format("The input serialization/deserialization class %s does not exist",
inputSerializer));
}
- ClassLoaderUtils.implementsClass(inputSerializer, SerDe.class, clsLoader);
-
- SerDe serDe = (SerDe) Reflections.createInstance(inputSerializer, clsLoader);
- if (serDe == null) {
- throw new IllegalArgumentException(String.format("The SerDe class %s does not exist",
- inputSerializer));
- }
- Class>[] serDeTypes = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass());
-
- // type inheritance information seems to be lost in generic type
- // load the actual type class for verification
- Class> fnInputClass;
- Class> serdeInputClass;
- try {
- fnInputClass = Class.forName(typeArg.getName(), true, clsLoader);
- serdeInputClass = Class.forName(serDeTypes[0].getName(), true, clsLoader);
- } catch (ClassNotFoundException | NoClassDefFoundError e) {
- throw new IllegalArgumentException("Failed to load type class", e);
- }
+ TypeDescription.Generic serDeTypeArg = serdeClass.getInterfaces().stream()
+ .filter(i -> i.asErasure().isAssignableTo(SerDe.class))
+ .findFirst()
+ .map(i -> i.getTypeArguments().get(0))
+ .orElseThrow(() -> new IllegalArgumentException(
+ String.format("%s does not implement %s", inputSerializer, SerDe.class.getName())));
if (deser) {
- if (!fnInputClass.isAssignableFrom(serdeInputClass)) {
- throw new IllegalArgumentException("Serializer type mismatch " + typeArg + " vs " + serDeTypes[0]);
+ if (!serDeTypeArg.asErasure().isAssignableTo(typeArg.asErasure())) {
+ throw new IllegalArgumentException("Serializer type mismatch " + typeArg.getActualName() + " vs "
+ + serDeTypeArg.getActualName());
}
} else {
- if (!serdeInputClass.isAssignableFrom(fnInputClass)) {
- throw new IllegalArgumentException("Serializer type mismatch " + typeArg + " vs " + serDeTypes[0]);
+ if (!serDeTypeArg.asErasure().isAssignableFrom(typeArg.asErasure())) {
+ throw new IllegalArgumentException("Serializer type mismatch " + typeArg.getActualName() + " vs "
+ + serDeTypeArg.getActualName());
}
}
}
- private static void validateSchemaType(String schemaClassName, Class> typeArg, ClassLoader clsLoader,
+ private static void validateSchemaType(TypeDefinition schema, TypeDefinition typeArg, TypePool typePool,
boolean input) {
- Schema> schema = (Schema>) Reflections.createInstance(schemaClassName, clsLoader);
- if (schema == null) {
- throw new IllegalArgumentException(String.format("The Schema class %s does not exist",
- schemaClassName));
- }
- Class>[] schemaTypes = TypeResolver.resolveRawArguments(Schema.class, schema.getClass());
- // type inheritance information seems to be lost in generic type
- // load the actual type class for verification
- Class> fnInputClass;
- Class> schemaInputClass;
- try {
- fnInputClass = Class.forName(typeArg.getName(), true, clsLoader);
- schemaInputClass = Class.forName(schemaTypes[0].getName(), true, clsLoader);
- } catch (ClassNotFoundException | NoClassDefFoundError e) {
- throw new IllegalArgumentException("Failed to load type class", e);
- }
+ TypeDescription.Generic schemaTypeArg = schema.getInterfaces().stream()
+ .filter(i -> i.asErasure().isAssignableTo(Schema.class))
+ .findFirst()
+ .map(i -> i.getTypeArguments().get(0))
+ .orElse(null);
if (input) {
- if (!fnInputClass.isAssignableFrom(schemaInputClass)) {
+ if (!schemaTypeArg.asErasure().isAssignableTo(typeArg.asErasure())) {
throw new IllegalArgumentException(
- "Schema type mismatch " + typeArg + " vs " + schemaTypes[0]);
+ "Schema type mismatch " + typeArg.getActualName() + " vs " + schemaTypeArg.getActualName());
}
} else {
- if (!schemaInputClass.isAssignableFrom(fnInputClass)) {
+ if (!schemaTypeArg.asErasure().isAssignableFrom(typeArg.asErasure())) {
throw new IllegalArgumentException(
- "Schema type mismatch " + typeArg + " vs " + schemaTypes[0]);
- }
- }
- }
-
-
- public static void validateFunctionClassTypes(ClassLoader classLoader,
- Function.FunctionDetails.Builder functionDetailsBuilder) {
-
- // validate only if classLoader is provided
- if (classLoader == null) {
- return;
- }
-
- if (isBlank(functionDetailsBuilder.getClassName())) {
- throw new IllegalArgumentException("Function class-name can't be empty");
- }
-
- // validate function class-type
- Class functionClass;
- try {
- functionClass = classLoader.loadClass(functionDetailsBuilder.getClassName());
- } catch (ClassNotFoundException | NoClassDefFoundError e) {
- throw new IllegalArgumentException(
- String.format("Function class %s must be in class path", functionDetailsBuilder.getClassName()), e);
- }
- Class>[] typeArgs = FunctionCommon.getFunctionTypes(functionClass, false);
-
- if (!(org.apache.pulsar.functions.api.Function.class.isAssignableFrom(functionClass))
- && !(java.util.function.Function.class.isAssignableFrom(functionClass))) {
- throw new RuntimeException("User class must either be Function or java.util.Function");
- }
-
- if (functionDetailsBuilder.hasSource() && functionDetailsBuilder.getSource() != null
- && isNotBlank(functionDetailsBuilder.getSource().getClassName())) {
- try {
- String sourceClassName = functionDetailsBuilder.getSource().getClassName();
- String argClassName = FunctionCommon.getTypeArg(sourceClassName, Source.class, classLoader).getName();
- functionDetailsBuilder
- .setSource(functionDetailsBuilder.getSourceBuilder().setTypeClassName(argClassName));
-
- // if sink-class not present then set same arg as source
- if (!functionDetailsBuilder.hasSink() || isBlank(functionDetailsBuilder.getSink().getClassName())) {
- functionDetailsBuilder
- .setSink(functionDetailsBuilder.getSinkBuilder().setTypeClassName(argClassName));
- }
-
- } catch (IllegalArgumentException ie) {
- throw ie;
- } catch (Exception e) {
- log.error("Failed to validate source class", e);
- throw new IllegalArgumentException("Failed to validate source class-name", e);
- }
- } else if (isBlank(functionDetailsBuilder.getSourceBuilder().getTypeClassName())) {
- // if function-src-class is not present then set function-src type-class according to function class
- functionDetailsBuilder
- .setSource(functionDetailsBuilder.getSourceBuilder().setTypeClassName(typeArgs[0].getName()));
- }
-
- if (functionDetailsBuilder.hasSink() && functionDetailsBuilder.getSink() != null
- && isNotBlank(functionDetailsBuilder.getSink().getClassName())) {
- try {
- String sinkClassName = functionDetailsBuilder.getSink().getClassName();
- String argClassName = FunctionCommon.getTypeArg(sinkClassName, Sink.class, classLoader).getName();
- functionDetailsBuilder.setSink(functionDetailsBuilder.getSinkBuilder().setTypeClassName(argClassName));
-
- // if source-class not present then set same arg as sink
- if (!functionDetailsBuilder.hasSource() || isBlank(functionDetailsBuilder.getSource().getClassName())) {
- functionDetailsBuilder
- .setSource(functionDetailsBuilder.getSourceBuilder().setTypeClassName(argClassName));
- }
-
- } catch (IllegalArgumentException ie) {
- throw ie;
- } catch (Exception e) {
- log.error("Failed to validate sink class", e);
- throw new IllegalArgumentException("Failed to validate sink class-name", e);
+ "Schema type mismatch " + typeArg.getActualName() + " vs " + schemaTypeArg.getActualName());
}
- } else if (isBlank(functionDetailsBuilder.getSinkBuilder().getTypeClassName())) {
- // if function-sink-class is not present then set function-sink type-class according to function class
- functionDetailsBuilder
- .setSink(functionDetailsBuilder.getSinkBuilder().setTypeClassName(typeArgs[1].getName()));
}
}
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionArchive.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionArchive.java
index 028b57d69c86b..cfb213f34ed72 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionArchive.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionArchive.java
@@ -19,14 +19,50 @@
package org.apache.pulsar.functions.utils.functions;
import java.nio.file.Path;
-import lombok.Builder;
-import lombok.Data;
import org.apache.pulsar.common.functions.FunctionDefinition;
+import org.apache.pulsar.functions.utils.FunctionFilePackage;
+import org.apache.pulsar.functions.utils.ValidatableFunctionPackage;
-@Builder
-@Data
-public class FunctionArchive {
- private Path archivePath;
- private ClassLoader classLoader;
- private FunctionDefinition functionDefinition;
+public class FunctionArchive implements AutoCloseable {
+ private final Path archivePath;
+ private final FunctionDefinition functionDefinition;
+ private final String narExtractionDirectory;
+ private final boolean enableClassloading;
+ private ValidatableFunctionPackage functionPackage;
+ private boolean closed;
+
+ public FunctionArchive(Path archivePath, FunctionDefinition functionDefinition, String narExtractionDirectory,
+ boolean enableClassloading) {
+ this.archivePath = archivePath;
+ this.functionDefinition = functionDefinition;
+ this.narExtractionDirectory = narExtractionDirectory;
+ this.enableClassloading = enableClassloading;
+ }
+
+ public Path getArchivePath() {
+ return archivePath;
+ }
+
+ public synchronized ValidatableFunctionPackage getFunctionPackage() {
+ if (closed) {
+ throw new IllegalStateException("FunctionArchive is already closed");
+ }
+ if (functionPackage == null) {
+ functionPackage = new FunctionFilePackage(archivePath.toFile(), narExtractionDirectory, enableClassloading,
+ FunctionDefinition.class);
+ }
+ return functionPackage;
+ }
+
+ public FunctionDefinition getFunctionDefinition() {
+ return functionDefinition;
+ }
+
+ @Override
+ public synchronized void close() throws Exception {
+ closed = true;
+ if (functionPackage instanceof AutoCloseable) {
+ ((AutoCloseable) functionPackage).close();
+ }
+ }
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionUtils.java
index 941df573e495e..31a5540e0bfaf 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionUtils.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.pulsar.functions.utils.functions;
import java.io.File;
@@ -30,10 +31,8 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.functions.FunctionDefinition;
import org.apache.pulsar.common.nar.NarClassLoader;
-import org.apache.pulsar.common.nar.NarClassLoaderBuilder;
import org.apache.pulsar.common.util.ObjectMapperFactory;
-import org.apache.pulsar.functions.api.Function;
-import org.apache.pulsar.functions.utils.Exceptions;
+import org.zeroturnaround.zip.ZipUtil;
@UtilityClass
@@ -45,43 +44,40 @@ public class FunctionUtils {
/**
* Extract the Pulsar Function class from a function or archive.
*/
- public static String getFunctionClass(ClassLoader classLoader) throws IOException {
- NarClassLoader ncl = (NarClassLoader) classLoader;
- String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
-
- FunctionDefinition conf = ObjectMapperFactory.getYamlMapper().reader().readValue(configStr,
- FunctionDefinition.class);
- if (StringUtils.isEmpty(conf.getFunctionClass())) {
- throw new IOException(
- String.format("The '%s' functionctor does not provide a function implementation", conf.getName()));
- }
+ public static String getFunctionClass(File narFile) throws IOException {
+ return getFunctionDefinition(narFile).getFunctionClass();
+ }
- try {
- // Try to load source class and check it implements Function interface
- Class functionClass = ncl.loadClass(conf.getFunctionClass());
- if (!(Function.class.isAssignableFrom(functionClass))) {
- throw new IOException(
- "Class " + conf.getFunctionClass() + " does not implement interface " + Function.class
- .getName());
- }
- } catch (Throwable t) {
- Exceptions.rethrowIOException(t);
+ public static FunctionDefinition getFunctionDefinition(File narFile) throws IOException {
+ return getPulsarIOServiceConfig(narFile, FunctionDefinition.class);
+ }
+
+ public static T getPulsarIOServiceConfig(File narFile, Class valueType) throws IOException {
+ String filename = "META-INF/services/" + PULSAR_IO_SERVICE_NAME;
+ byte[] configEntry = ZipUtil.unpackEntry(narFile, filename);
+ if (configEntry != null) {
+ return ObjectMapperFactory.getYamlMapper().reader().readValue(configEntry, valueType);
+ } else {
+ return null;
}
+ }
- return conf.getFunctionClass();
+ public static String getFunctionClass(NarClassLoader narClassLoader) throws IOException {
+ return getFunctionDefinition(narClassLoader).getFunctionClass();
}
public static FunctionDefinition getFunctionDefinition(NarClassLoader narClassLoader) throws IOException {
- String configStr = narClassLoader.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
- return ObjectMapperFactory.getYamlMapper().reader().readValue(configStr, FunctionDefinition.class);
+ return getPulsarIOServiceConfig(narClassLoader, FunctionDefinition.class);
}
- public static TreeMap searchForFunctions(String functionsDirectory) throws IOException {
- return searchForFunctions(functionsDirectory, false);
+ public static T getPulsarIOServiceConfig(NarClassLoader narClassLoader, Class valueType) throws IOException {
+ return ObjectMapperFactory.getYamlMapper().reader()
+ .readValue(narClassLoader.getServiceDefinition(PULSAR_IO_SERVICE_NAME), valueType);
}
public static TreeMap searchForFunctions(String functionsDirectory,
- boolean alwaysPopulatePath) throws IOException {
+ String narExtractionDirectory,
+ boolean enableClassloading) throws IOException {
Path path = Paths.get(functionsDirectory).toAbsolutePath();
log.info("Searching for functions in {}", path);
@@ -95,22 +91,12 @@ public static TreeMap searchForFunctions(String functio
try (DirectoryStream stream = Files.newDirectoryStream(path, "*.nar")) {
for (Path archive : stream) {
try {
-
- NarClassLoader ncl = NarClassLoaderBuilder.builder()
- .narFile(new File(archive.toString()))
- .build();
-
- FunctionArchive.FunctionArchiveBuilder functionArchiveBuilder = FunctionArchive.builder();
- FunctionDefinition cntDef = FunctionUtils.getFunctionDefinition(ncl);
+ FunctionDefinition cntDef = FunctionUtils.getFunctionDefinition(archive.toFile());
log.info("Found function {} from {}", cntDef, archive);
-
- functionArchiveBuilder.archivePath(archive);
-
- functionArchiveBuilder.classLoader(ncl);
- functionArchiveBuilder.functionDefinition(cntDef);
-
- if (alwaysPopulatePath || !StringUtils.isEmpty(cntDef.getFunctionClass())) {
- functions.put(cntDef.getName(), functionArchiveBuilder.build());
+ if (!StringUtils.isEmpty(cntDef.getFunctionClass())) {
+ FunctionArchive functionArchive =
+ new FunctionArchive(archive, cntDef, narExtractionDirectory, enableClassloading);
+ functions.put(cntDef.getName(), functionArchive);
}
} catch (Throwable t) {
log.warn("Failed to load function from {}", archive, t);
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/Connector.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/Connector.java
index f1a03f4424ec6..5fcc22747c516 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/Connector.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/Connector.java
@@ -20,17 +20,79 @@
import java.nio.file.Path;
import java.util.List;
-import lombok.Builder;
-import lombok.Data;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.io.ConfigFieldDefinition;
import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.functions.utils.FunctionFilePackage;
+import org.apache.pulsar.functions.utils.ValidatableFunctionPackage;
-@Builder
-@Data
-public class Connector {
- private Path archivePath;
+public class Connector implements AutoCloseable {
+ private final Path archivePath;
+ private final String narExtractionDirectory;
+ private final boolean enableClassloading;
+ private ValidatableFunctionPackage connectorFunctionPackage;
private List sourceConfigFieldDefinitions;
private List sinkConfigFieldDefinitions;
- private ClassLoader classLoader;
private ConnectorDefinition connectorDefinition;
+ private boolean closed;
+
+ public Connector(Path archivePath, ConnectorDefinition connectorDefinition, String narExtractionDirectory,
+ boolean enableClassloading) {
+ this.archivePath = archivePath;
+ this.connectorDefinition = connectorDefinition;
+ this.narExtractionDirectory = narExtractionDirectory;
+ this.enableClassloading = enableClassloading;
+ }
+
+ public Path getArchivePath() {
+ return archivePath;
+ }
+
+ public synchronized ValidatableFunctionPackage getConnectorFunctionPackage() {
+ checkState();
+ if (connectorFunctionPackage == null) {
+ connectorFunctionPackage =
+ new FunctionFilePackage(archivePath.toFile(), narExtractionDirectory, enableClassloading,
+ ConnectorDefinition.class);
+ }
+ return connectorFunctionPackage;
+ }
+
+ private void checkState() {
+ if (closed) {
+ throw new IllegalStateException("Connector is already closed");
+ }
+ }
+
+ public synchronized List getSourceConfigFieldDefinitions() {
+ checkState();
+ if (sourceConfigFieldDefinitions == null && !StringUtils.isEmpty(connectorDefinition.getSourceClass())
+ && !StringUtils.isEmpty(connectorDefinition.getSourceConfigClass())) {
+ sourceConfigFieldDefinitions = ConnectorUtils.getConnectorConfigDefinition(getConnectorFunctionPackage(),
+ connectorDefinition.getSourceConfigClass());
+ }
+ return sourceConfigFieldDefinitions;
+ }
+
+ public synchronized List getSinkConfigFieldDefinitions() {
+ checkState();
+ if (sinkConfigFieldDefinitions == null && !StringUtils.isEmpty(connectorDefinition.getSinkClass())
+ && !StringUtils.isEmpty(connectorDefinition.getSinkConfigClass())) {
+ sinkConfigFieldDefinitions = ConnectorUtils.getConnectorConfigDefinition(getConnectorFunctionPackage(),
+ connectorDefinition.getSinkConfigClass());
+ }
+ return sinkConfigFieldDefinitions;
+ }
+
+ public ConnectorDefinition getConnectorDefinition() {
+ return connectorDefinition;
+ }
+
+ @Override
+ public synchronized void close() throws Exception {
+ closed = true;
+ if (connectorFunctionPackage instanceof AutoCloseable) {
+ ((AutoCloseable) connectorFunctionPackage).close();
+ }
+ }
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
index a814bf35548f3..df1310965f392 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
@@ -18,38 +18,31 @@
*/
package org.apache.pulsar.functions.utils.io;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.File;
import java.io.IOException;
-import java.lang.annotation.Annotation;
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
-import java.util.AbstractMap;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.stream.Collectors;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
+import net.bytebuddy.description.annotation.AnnotationDescription;
+import net.bytebuddy.description.annotation.AnnotationValue;
+import net.bytebuddy.description.field.FieldDescription;
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.description.type.TypeDefinition;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.io.ConfigFieldDefinition;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.nar.NarClassLoader;
-import org.apache.pulsar.common.nar.NarClassLoaderBuilder;
-import org.apache.pulsar.common.util.FutureUtil;
-import org.apache.pulsar.common.util.ObjectMapperFactory;
-import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.utils.Exceptions;
+import org.apache.pulsar.functions.utils.ValidatableFunctionPackage;
+import org.apache.pulsar.functions.utils.functions.FunctionUtils;
import org.apache.pulsar.io.core.BatchSource;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.Source;
@@ -76,7 +69,7 @@ public static String getIOSourceClass(NarClassLoader narClassLoader) throws IOEx
Class sourceClass = narClassLoader.loadClass(conf.getSourceClass());
if (!(Source.class.isAssignableFrom(sourceClass) || BatchSource.class.isAssignableFrom(sourceClass))) {
throw new IOException(String.format("Class %s does not implement interface %s or %s",
- conf.getSourceClass(), Source.class.getName(), BatchSource.class.getName()));
+ conf.getSourceClass(), Source.class.getName(), BatchSource.class.getName()));
}
} catch (Throwable t) {
Exceptions.rethrowIOException(t);
@@ -109,32 +102,36 @@ public static String getIOSinkClass(NarClassLoader narClassLoader) throws IOExce
return conf.getSinkClass();
}
- public static ConnectorDefinition getConnectorDefinition(NarClassLoader narClassLoader) throws IOException {
- String configStr = narClassLoader.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
+ public static ConnectorDefinition getConnectorDefinition(File narFile) throws IOException {
+ return FunctionUtils.getPulsarIOServiceConfig(narFile, ConnectorDefinition.class);
+ }
- return ObjectMapperFactory.getYamlMapper().reader().readValue(configStr, ConnectorDefinition.class);
+ public static ConnectorDefinition getConnectorDefinition(NarClassLoader narClassLoader) throws IOException {
+ return FunctionUtils.getPulsarIOServiceConfig(narClassLoader, ConnectorDefinition.class);
}
- public static List getConnectorConfigDefinition(ClassLoader classLoader,
- String configClassName) throws Exception {
+ public static List getConnectorConfigDefinition(
+ ValidatableFunctionPackage connectorFunctionPackage,
+ String configClassName) {
List retval = new LinkedList<>();
- Class configClass = classLoader.loadClass(configClassName);
- for (Field field : Reflections.getAllFields(configClass)) {
- if (java.lang.reflect.Modifier.isStatic(field.getModifiers())) {
- // We dont want static fields
+ TypeDefinition configClass = connectorFunctionPackage.resolveType(configClassName);
+
+ for (FieldDescription field : getAllFields(configClass)) {
+ if (field.isStatic()) {
+ // We don't want static fields
continue;
}
- field.setAccessible(true);
ConfigFieldDefinition configFieldDefinition = new ConfigFieldDefinition();
configFieldDefinition.setFieldName(field.getName());
- configFieldDefinition.setTypeName(field.getType().getName());
+ configFieldDefinition.setTypeName(field.getType().getActualName());
Map attributes = new HashMap<>();
- for (Annotation annotation : field.getAnnotations()) {
- if (annotation.annotationType().equals(FieldDoc.class)) {
- FieldDoc fieldDoc = (FieldDoc) annotation;
- for (Method method : FieldDoc.class.getDeclaredMethods()) {
- Object value = method.invoke(fieldDoc);
- attributes.put(method.getName(), value == null ? "" : value.toString());
+ for (AnnotationDescription annotation : field.getDeclaredAnnotations()) {
+ if (annotation.getAnnotationType().represents(FieldDoc.class)) {
+ for (MethodDescription.InDefinedShape method : annotation.getAnnotationType()
+ .getDeclaredMethods()) {
+ AnnotationValue, ?> value = annotation.getValue(method.getName());
+ attributes.put(method.getName(),
+ value == null || value.resolve() == null ? "" : value.resolve().toString());
}
}
}
@@ -145,86 +142,42 @@ public static List getConnectorConfigDefinition(ClassLoad
return retval;
}
+ private static List getAllFields(TypeDefinition type) {
+ List fields = new LinkedList<>();
+ fields.addAll(type.getDeclaredFields());
+
+ if (type.getSuperClass() != null) {
+ fields.addAll(getAllFields(type.getSuperClass()));
+ }
+
+ return fields;
+ }
+
public static TreeMap searchForConnectors(String connectorsDirectory,
- String narExtractionDirectory) throws IOException {
+ String narExtractionDirectory,
+ boolean enableClassloading) throws IOException {
Path path = Paths.get(connectorsDirectory).toAbsolutePath();
log.info("Searching for connectors in {}", path);
+ TreeMap connectors = new TreeMap<>();
+
if (!path.toFile().exists()) {
log.warn("Connectors archive directory not found");
- return new TreeMap<>();
+ return connectors;
}
- List archives = new ArrayList<>();
try (DirectoryStream stream = Files.newDirectoryStream(path, "*.nar")) {
for (Path archive : stream) {
- archives.add(archive);
- }
- }
- if (archives.isEmpty()) {
- return new TreeMap<>();
- }
-
- ExecutorService oneTimeExecutor = null;
- try {
- int nThreads = Math.min(Runtime.getRuntime().availableProcessors(), archives.size());
- log.info("Loading {} connector definitions with a thread pool of size {}", archives.size(), nThreads);
- oneTimeExecutor = Executors.newFixedThreadPool(nThreads,
- new ThreadFactoryBuilder().setNameFormat("connector-extraction-executor-%d").build());
- List>> futures = new ArrayList<>();
- for (Path archive : archives) {
- CompletableFuture> future = CompletableFuture.supplyAsync(() ->
- getConnectorDefinitionEntry(archive, narExtractionDirectory), oneTimeExecutor);
- futures.add(future);
- }
-
- FutureUtil.waitForAll(futures).join();
- return futures.stream()
- .map(CompletableFuture::join)
- .filter(entry -> entry != null)
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (a, b) -> a, TreeMap::new));
- } finally {
- if (oneTimeExecutor != null) {
- oneTimeExecutor.shutdown();
- }
- }
- }
-
- private static Map.Entry getConnectorDefinitionEntry(Path archive,
- String narExtractionDirectory) {
- try {
-
- NarClassLoader ncl = NarClassLoaderBuilder.builder()
- .narFile(new File(archive.toString()))
- .extractionDirectory(narExtractionDirectory)
- .build();
-
- Connector.ConnectorBuilder connectorBuilder = Connector.builder();
- ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(ncl);
- log.info("Found connector {} from {}", cntDef, archive);
-
- connectorBuilder.archivePath(archive);
- if (!StringUtils.isEmpty(cntDef.getSourceClass())) {
- if (!StringUtils.isEmpty(cntDef.getSourceConfigClass())) {
- connectorBuilder.sourceConfigFieldDefinitions(
- ConnectorUtils.getConnectorConfigDefinition(ncl,
- cntDef.getSourceConfigClass()));
+ try {
+ ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(archive.toFile());
+ log.info("Found connector {} from {}", cntDef, archive);
+ Connector connector = new Connector(archive, cntDef, narExtractionDirectory, enableClassloading);
+ connectors.put(cntDef.getName(), connector);
+ } catch (Throwable t) {
+ log.warn("Failed to load connector from {}", archive, t);
}
}
-
- if (!StringUtils.isEmpty(cntDef.getSinkClass())) {
- if (!StringUtils.isEmpty(cntDef.getSinkConfigClass())) {
- connectorBuilder.sinkConfigFieldDefinitions(
- ConnectorUtils.getConnectorConfigDefinition(ncl, cntDef.getSinkConfigClass()));
- }
- }
-
- connectorBuilder.classLoader(ncl);
- connectorBuilder.connectorDefinition(cntDef);
- return new AbstractMap.SimpleEntry(cntDef.getName(), connectorBuilder.build());
- } catch (Throwable t) {
- log.warn("Failed to load connector from {}", archive, t);
- return null;
}
+ return connectors;
}
}
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionCommonTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionCommonTest.java
index 131f153b08d68..90fdd4da777d3 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionCommonTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionCommonTest.java
@@ -30,16 +30,17 @@
import com.github.tomakehurst.wiremock.WireMockServer;
import java.io.File;
import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
import lombok.Cleanup;
+import net.bytebuddy.description.type.TypeDefinition;
+import net.bytebuddy.pool.TypePool;
import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.WindowContext;
import org.apache.pulsar.functions.api.WindowFunction;
import org.assertj.core.util.Files;
-import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -47,41 +48,6 @@
* Unit test of {@link Exceptions}.
*/
public class FunctionCommonTest {
-
- @Test
- public void testValidateLocalFileUrl() throws Exception {
- String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath();
- try {
- // eg: fileLocation : /dir/fileName.jar (invalid)
- FunctionCommon.extractClassLoader(fileLocation);
- Assert.fail("should fail with invalid url: without protocol");
- } catch (IllegalArgumentException ie) {
- // Ok.. expected exception
- }
- String fileLocationWithProtocol = "file://" + fileLocation;
- // eg: fileLocation : file:///dir/fileName.jar (valid)
- FunctionCommon.extractClassLoader(fileLocationWithProtocol);
- // eg: fileLocation : file:/dir/fileName.jar (valid)
- fileLocationWithProtocol = "file:" + fileLocation;
- FunctionCommon.extractClassLoader(fileLocationWithProtocol);
- }
-
- @Test
- public void testValidateHttpFileUrl() throws Exception {
-
- String jarHttpUrl = "https://repo1.maven.org/maven2/org/apache/pulsar/pulsar-common/2.4.2/pulsar-common-2.4.2.jar";
- FunctionCommon.extractClassLoader(jarHttpUrl);
-
- jarHttpUrl = "http://_invalidurl_.com";
- try {
- // eg: fileLocation : /dir/fileName.jar (invalid)
- FunctionCommon.extractClassLoader(jarHttpUrl);
- Assert.fail("should fail with invalid url: without protocol");
- } catch (Exception ie) {
- // Ok.. expected exception
- }
- }
-
@Test
public void testDownloadFile() throws Exception {
final String jarHttpUrl = "https://repo1.maven.org/maven2/org/apache/pulsar/pulsar-common/2.4.2/pulsar-common-2.4.2.jar";
@@ -150,6 +116,14 @@ public Record process(String input, Context context) throws Exception {
}
}, false
},
+ {
+ new Function>() {
+ @Override
+ public CompletableFuture process(String input, Context context) throws Exception {
+ return null;
+ }
+ }, false
+ },
{
new java.util.function.Function() {
@Override
@@ -166,6 +140,14 @@ public Record apply(String s) {
}
}, false
},
+ {
+ new java.util.function.Function>() {
+ @Override
+ public CompletableFuture apply(String s) {
+ return null;
+ }
+ }, false
+ },
{
new WindowFunction() {
@Override
@@ -182,6 +164,14 @@ public Record process(Collection> input, WindowContext c
}
}, true
},
+ {
+ new WindowFunction>() {
+ @Override
+ public CompletableFuture process(Collection> input, WindowContext context) throws Exception {
+ return null;
+ }
+ }, true
+ },
{
new java.util.function.Function, Integer>() {
@Override
@@ -197,15 +187,26 @@ public Record apply(Collection strings) {
return null;
}
}, true
+ },
+ {
+ new java.util.function.Function, CompletableFuture>() {
+ @Override
+ public CompletableFuture apply(Collection strings) {
+ return null;
+ }
+ }, true
}
};
}
@Test(dataProvider = "function")
public void testGetFunctionTypes(Object function, boolean isWindowConfigPresent) {
- Class>[] types = FunctionCommon.getFunctionTypes(function.getClass(), isWindowConfigPresent);
+ TypePool typePool = TypePool.Default.of(function.getClass().getClassLoader());
+ TypeDefinition[] types =
+ FunctionCommon.getFunctionTypes(typePool.describe(function.getClass().getName()).resolve(),
+ isWindowConfigPresent);
assertEquals(types.length, 2);
- assertEquals(types[0], String.class);
- assertEquals(types[1], Integer.class);
+ assertEquals(types[0].asErasure().getTypeName(), String.class.getName());
+ assertEquals(types[1].asErasure().getTypeName(), Integer.class.getName());
}
}
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
index 8f46199e8ffd5..954eef44a7366 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
@@ -18,13 +18,24 @@
*/
package org.apache.pulsar.functions.utils;
+import static org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE;
+import static org.apache.pulsar.common.functions.FunctionConfig.Runtime.PYTHON;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
import com.google.gson.Gson;
-
-import org.apache.pulsar.client.api.CompressionType;
-import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
@@ -32,28 +43,29 @@
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.functions.WindowConfig;
import org.apache.pulsar.common.util.Reflections;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.api.WindowContext;
+import org.apache.pulsar.functions.api.WindowFunction;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.testng.annotations.Test;
-import java.lang.reflect.Field;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE;
-import static org.apache.pulsar.common.functions.FunctionConfig.Runtime.PYTHON;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertThrows;
-import static org.testng.Assert.assertTrue;
-
/**
* Unit test of {@link Reflections}.
*/
@Slf4j
public class FunctionConfigUtilsTest {
+ public static class WordCountWindowFunction implements WindowFunction {
+ @Override
+ public Void process(Collection> inputs, WindowContext context) throws Exception {
+ for (Record input : inputs) {
+ Arrays.asList(input.getValue().split("\\.")).forEach(word -> context.incrCounter(word, 1));
+ }
+ return null;
+ }
+ }
+
@Test
public void testAutoAckConvertFailed() {
@@ -63,7 +75,7 @@ public void testAutoAckConvertFailed() {
functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATMOST_ONCE);
assertThrows(IllegalArgumentException.class, () -> {
- FunctionConfigUtils.convert(functionConfig, (ClassLoader) null);
+ FunctionConfigUtils.convert(functionConfig);
});
}
@@ -99,7 +111,7 @@ public void testConvertBackFidelity() {
producerConfig.setBatchBuilder("DEFAULT");
producerConfig.setCompressionType(CompressionType.ZLIB);
functionConfig.setProducerConfig(producerConfig);
- Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig, (ClassLoader) null);
+ Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig);
FunctionConfig convertedConfig = FunctionConfigUtils.convertFromDetails(functionDetails);
// add default resources
@@ -119,7 +131,7 @@ public void testConvertWindow() {
functionConfig.setNamespace("test-namespace");
functionConfig.setName("test-function");
functionConfig.setParallelism(1);
- functionConfig.setClassName(IdentityFunction.class.getName());
+ functionConfig.setClassName(WordCountWindowFunction.class.getName());
Map inputSpecs = new HashMap<>();
inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(true).serdeClassName("test-serde").build());
functionConfig.setInputSpecs(inputSpecs);
@@ -141,7 +153,7 @@ public void testConvertWindow() {
producerConfig.setBatchBuilder("KEY_BASED");
producerConfig.setCompressionType(CompressionType.SNAPPY);
functionConfig.setProducerConfig(producerConfig);
- Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig, (ClassLoader) null);
+ Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig);
FunctionConfig convertedConfig = FunctionConfigUtils.convertFromDetails(functionDetails);
// WindowsFunction guarantees convert to FunctionGuarantees.
@@ -163,7 +175,7 @@ public void testConvertBatchBuilder() {
FunctionConfig functionConfig = createFunctionConfig();
functionConfig.setBatchBuilder("KEY_BASED");
- Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig, (ClassLoader) null);
+ Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig);
assertEquals(functionDetails.getSink().getProducerSpec().getBatchBuilder(), "KEY_BASED");
FunctionConfig convertedConfig = FunctionConfigUtils.convertFromDetails(functionDetails);
@@ -519,7 +531,6 @@ private FunctionConfig createFunctionConfig() {
functionConfig.setUserConfig(new HashMap<>());
functionConfig.setAutoAck(true);
functionConfig.setTimeoutMs(2000L);
- functionConfig.setWindowConfig(new WindowConfig().setWindowLengthCount(10));
functionConfig.setCleanupSubscription(true);
functionConfig.setRuntimeFlags("-Dfoo=bar");
return functionConfig;
@@ -553,7 +564,7 @@ public void testDisableForwardSourceMessageProperty() throws InvalidProtocolBuff
config.setForwardSourceMessageProperty(true);
FunctionConfigUtils.inferMissingArguments(config, false);
assertNull(config.getForwardSourceMessageProperty());
- FunctionDetails details = FunctionConfigUtils.convert(config, FunctionConfigUtilsTest.class.getClassLoader());
+ FunctionDetails details = FunctionConfigUtils.convert(config);
assertFalse(details.getSink().getForwardSourceMessageProperty());
String detailsJson = "'" + JsonFormat.printer().omittingInsignificantWhitespace().print(details) + "'";
log.info("Function details : {}", detailsJson);
@@ -640,7 +651,7 @@ public void testMergeDifferentOutputSchemaTypes() {
@Test
public void testPoolMessages() {
FunctionConfig functionConfig = createFunctionConfig();
- Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig, (ClassLoader) null);
+ Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig);
assertFalse(functionDetails.getSource().getInputSpecsMap().get("test-input").getPoolMessages());
FunctionConfig convertedConfig = FunctionConfigUtils.convertFromDetails(functionDetails);
assertFalse(convertedConfig.getInputSpecs().get("test-input").isPoolMessages());
@@ -650,7 +661,7 @@ public void testPoolMessages() {
.poolMessages(true).build());
functionConfig.setInputSpecs(inputSpecs);
- functionDetails = FunctionConfigUtils.convert(functionConfig, (ClassLoader) null);
+ functionDetails = FunctionConfigUtils.convert(functionConfig);
assertTrue(functionDetails.getSource().getInputSpecsMap().get("test-input").getPoolMessages());
convertedConfig = FunctionConfigUtils.convertFromDetails(functionDetails);
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
index 8ac9b61e3f60f..14cd77f60ff95 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
@@ -18,37 +18,38 @@
*/
package org.apache.pulsar.functions.utils;
+import static org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE;
+import static org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees.ATMOST_ONCE;
+import static org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
+import java.io.IOException;
+import java.lang.reflect.Field;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Resources;
+import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.config.validation.ConfigValidationAnnotations;
-import org.apache.pulsar.functions.api.utils.IdentityFunction;
+import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
import org.testng.annotations.Test;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE;
-import static org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees.ATMOST_ONCE;
-import static org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertThrows;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.expectThrows;
-
/**
* Unit test of {@link SinkConfigUtilsTest}.
*/
@@ -62,6 +63,27 @@ public static class TestSinkConfig {
private String configParameter;
}
+
+ public static class NopSink implements Sink