Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
[improve][fn] Optimize Function Worker startup by lazy loading and di…
Browse files Browse the repository at this point in the history
…rect zip/bytecode access (apache#22122)
  • Loading branch information
lhotari authored Feb 26, 2024
1 parent 1b1cfb5 commit bbc6224
Show file tree
Hide file tree
Showing 43 changed files with 3,639 additions and 4,807 deletions.
15 changes: 14 additions & 1 deletion conf/functions_worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ metadataStoreOperationTimeoutSeconds: 30
# Metadata store cache expiry time in seconds
metadataStoreCacheExpirySeconds: 300

# Specifies if the function worker should use classloading for validating submissions for built-in
# connectors and functions. This is required for validateConnectorConfig to take effect.
# Default is false.
enableClassloadingOfBuiltinFiles: false

# Specifies if the function worker should use classloading for validating submissions for external
# connectors and functions. This is required for validateConnectorConfig to take effect.
# Default is false.
enableClassloadingOfExternalFiles: false

################################
# Function package management
################################
Expand Down Expand Up @@ -400,7 +410,10 @@ saslJaasServerRoleTokenSignerSecretPath:
connectorsDirectory: ./connectors
functionsDirectory: ./functions

# Should connector config be validated during submission
# Enables extended validation for connector config with fine-grain annotation based validation
# during submission. Classloading with either enableClassloadingOfExternalFiles or
# enableClassloadingOfBuiltinFiles must be enabled on the worker for this to take effect.
# Default is false.
validateConnectorConfig: false

# Whether to initialize distributed log metadata by runtime.
Expand Down
4 changes: 4 additions & 0 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,10 @@ The Apache Software License, Version 2.0
* Jodah
- net.jodah-typetools-0.5.0.jar
- net.jodah-failsafe-2.4.4.jar
* Byte Buddy
- net.bytebuddy-byte-buddy-1.14.12.jar
* zt-zip
- org.zeroturnaround-zt-zip-1.17.jar
* Apache Avro
- org.apache.avro-avro-1.11.3.jar
- org.apache.avro-avro-protobuf-1.11.3.jar
Expand Down
14 changes: 14 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ flexible messaging model and an intuitive client API.</description>
<docker-maven.version>0.43.3</docker-maven.version>
<docker.verbose>true</docker.verbose>
<typetools.version>0.5.0</typetools.version>
<byte-buddy.version>1.14.12</byte-buddy.version>
<zt-zip.version>1.17</zt-zip.version>
<protobuf3.version>3.19.6</protobuf3.version>
<protoc3.version>${protobuf3.version}</protoc3.version>
<grpc.version>1.55.3</grpc.version>
Expand Down Expand Up @@ -1066,6 +1068,18 @@ flexible messaging model and an intuitive client API.</description>
<version>${typetools.version}</version>
</dependency>

<dependency>
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy</artifactId>
<version>${byte-buddy.version}</version>
</dependency>

<dependency>
<groupId>org.zeroturnaround</groupId>
<artifactId>zt-zip</artifactId>
<version>${zt-zip.version}</version>
</dependency>

<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-bom</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ public NarClassLoader run() {
});
}

public static List<File> getClasspathFromArchive(File narPath, String narExtractionDirectory) throws IOException {
File unpacked = NarUnpacker.unpackNar(narPath, getNarExtractionDirectory(narExtractionDirectory));
return getClassPathEntries(unpacked);
}

private static File getNarExtractionDirectory(String configuredDirectory) {
return new File(configuredDirectory + "/" + TMP_DIR_PREFIX);
}
Expand All @@ -164,16 +169,11 @@ private static File getNarExtractionDirectory(String configuredDirectory) {
* @param narWorkingDirectory
* directory to explode nar contents to
* @param parent
* @throws IllegalArgumentException
* if the NAR is missing the Java Services API file for <tt>FlowFileProcessor</tt> implementations.
* @throws ClassNotFoundException
* if any of the <tt>FlowFileProcessor</tt> implementations defined by the Java Services API cannot be
* loaded.
* @throws IOException
* if an error occurs while loading the NAR.
*/
private NarClassLoader(final File narWorkingDirectory, Set<String> additionalJars, ClassLoader parent)
throws ClassNotFoundException, IOException {
throws IOException {
super(new URL[0], parent);
this.narWorkingDirectory = narWorkingDirectory;

Expand Down Expand Up @@ -238,22 +238,31 @@ public List<String> getServiceImplementation(String serviceName) throws IOExcept
* if the URL list could not be updated.
*/
private void updateClasspath(File root) throws IOException {
addURL(root.toURI().toURL()); // for compiled classes, META-INF/, etc.
getClassPathEntries(root).forEach(f -> {
try {
addURL(f.toURI().toURL());
} catch (IOException e) {
log.error("Failed to add entry to classpath: {}", f, e);
}
});
}

static List<File> getClassPathEntries(File root) {
List<File> classPathEntries = new ArrayList<>();
classPathEntries.add(root);
File dependencies = new File(root, "META-INF/bundled-dependencies");
if (!dependencies.isDirectory()) {
log.warn("{} does not contain META-INF/bundled-dependencies!", narWorkingDirectory);
log.warn("{} does not contain META-INF/bundled-dependencies!", root);
}
addURL(dependencies.toURI().toURL());
classPathEntries.add(dependencies);
if (dependencies.isDirectory()) {
final File[] jarFiles = dependencies.listFiles(JAR_FILTER);
if (jarFiles != null) {
Arrays.sort(jarFiles, Comparator.comparing(File::getName));
for (File libJar : jarFiles) {
addURL(libJar.toURI().toURL());
}
classPathEntries.addAll(Arrays.asList(jarFiles));
}
}
return classPathEntries;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.file.Path;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import java.util.Enumeration;
import java.util.concurrent.ConcurrentHashMap;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
import lombok.extern.slf4j.Slf4j;

/**
Expand Down Expand Up @@ -113,18 +114,24 @@ static File doUnpackNar(final File nar, final File baseWorkingDirectory, Runnabl
* if the NAR could not be unpacked.
*/
private static void unpack(final File nar, final File workingDirectory) throws IOException {
try (JarFile jarFile = new JarFile(nar)) {
Enumeration<JarEntry> jarEntries = jarFile.entries();
while (jarEntries.hasMoreElements()) {
JarEntry jarEntry = jarEntries.nextElement();
String name = jarEntry.getName();
File f = new File(workingDirectory, name);
if (jarEntry.isDirectory()) {
Path workingDirectoryPath = workingDirectory.toPath().normalize();
try (ZipFile zipFile = new ZipFile(nar)) {
Enumeration<? extends ZipEntry> zipEntries = zipFile.entries();
while (zipEntries.hasMoreElements()) {
ZipEntry zipEntry = zipEntries.nextElement();
String name = zipEntry.getName();
Path targetFilePath = workingDirectoryPath.resolve(name).normalize();
if (!targetFilePath.startsWith(workingDirectoryPath)) {
log.error("Invalid zip file with entry '{}'", name);
throw new IOException("Invalid zip file. Aborting unpacking.");
}
File f = targetFilePath.toFile();
if (zipEntry.isDirectory()) {
FileUtils.ensureDirectoryExistAndCanReadAndWrite(f);
} else {
// The directory entry might appear after the file entry
FileUtils.ensureDirectoryExistAndCanReadAndWrite(f.getParentFile());
makeFile(jarFile.getInputStream(jarEntry), f);
makeFile(zipFile.getInputStream(zipEntry), f);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.FunctionDefinition;
import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.nar.FileUtils;
Expand All @@ -75,8 +77,11 @@
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.FunctionRuntimeCommon;
import org.apache.pulsar.functions.utils.LoadedFunctionPackage;
import org.apache.pulsar.functions.utils.SinkConfigUtils;
import org.apache.pulsar.functions.utils.SourceConfigUtils;
import org.apache.pulsar.functions.utils.ValidatableFunctionPackage;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
import org.apache.pulsar.functions.utils.functions.FunctionArchive;
import org.apache.pulsar.functions.utils.functions.FunctionUtils;
Expand Down Expand Up @@ -357,9 +362,12 @@ public void start(boolean blocking) throws Exception {
userCodeFile = functionConfig.getJar();
userCodeClassLoader = extractClassLoader(
userCodeFile, ComponentType.FUNCTION, functionConfig.getClassName());
ValidatableFunctionPackage validatableFunctionPackage =
new LoadedFunctionPackage(getCurrentOrUserCodeClassLoader(),
FunctionDefinition.class);
functionDetails = FunctionConfigUtils.convert(
functionConfig,
FunctionConfigUtils.validateJavaFunction(functionConfig, getCurrentOrUserCodeClassLoader()));
FunctionConfigUtils.validateJavaFunction(functionConfig, validatableFunctionPackage));
} else if (functionConfig.getRuntime() == FunctionConfig.Runtime.GO) {
userCodeFile = functionConfig.getGo();
} else if (functionConfig.getRuntime() == FunctionConfig.Runtime.PYTHON) {
Expand All @@ -369,24 +377,30 @@ public void start(boolean blocking) throws Exception {
}

if (functionDetails == null) {
functionDetails = FunctionConfigUtils.convert(functionConfig, getCurrentOrUserCodeClassLoader());
ValidatableFunctionPackage validatableFunctionPackage =
new LoadedFunctionPackage(getCurrentOrUserCodeClassLoader(),
FunctionDefinition.class);
functionDetails = FunctionConfigUtils.convert(functionConfig, validatableFunctionPackage);
}
} else if (sourceConfig != null) {
inferMissingArguments(sourceConfig);
userCodeFile = sourceConfig.getArchive();
parallelism = sourceConfig.getParallelism();
userCodeClassLoader = extractClassLoader(
userCodeFile, ComponentType.SOURCE, sourceConfig.getClassName());
functionDetails = SourceConfigUtils.convert(
sourceConfig,
SourceConfigUtils.validateAndExtractDetails(sourceConfig, getCurrentOrUserCodeClassLoader(), true));
ValidatableFunctionPackage validatableFunctionPackage =
new LoadedFunctionPackage(getCurrentOrUserCodeClassLoader(), ConnectorDefinition.class);
functionDetails = SourceConfigUtils.convert(sourceConfig,
SourceConfigUtils.validateAndExtractDetails(sourceConfig, validatableFunctionPackage, true));
} else if (sinkConfig != null) {
inferMissingArguments(sinkConfig);
userCodeFile = sinkConfig.getArchive();
transformFunctionFile = sinkConfig.getTransformFunction();
parallelism = sinkConfig.getParallelism();
userCodeClassLoader = extractClassLoader(
userCodeFile, ComponentType.SINK, sinkConfig.getClassName());
ValidatableFunctionPackage validatableFunctionPackage =
new LoadedFunctionPackage(getCurrentOrUserCodeClassLoader(), ConnectorDefinition.class);
if (isNotEmpty(sinkConfig.getTransformFunction())) {
transformFunctionCodeClassLoader = extractClassLoader(
sinkConfig.getTransformFunction(),
Expand All @@ -395,16 +409,19 @@ public void start(boolean blocking) throws Exception {
}

ClassLoader functionClassLoader = null;
ValidatableFunctionPackage validatableTransformFunction = null;
if (transformFunctionCodeClassLoader != null) {
functionClassLoader = transformFunctionCodeClassLoader.getClassLoader() == null
? Thread.currentThread().getContextClassLoader()
: transformFunctionCodeClassLoader.getClassLoader();
validatableTransformFunction =
new LoadedFunctionPackage(functionClassLoader, FunctionDefinition.class);
}

functionDetails = SinkConfigUtils.convert(
sinkConfig,
SinkConfigUtils.validateAndExtractDetails(sinkConfig, getCurrentOrUserCodeClassLoader(),
functionClassLoader, true));
SinkConfigUtils.validateAndExtractDetails(sinkConfig, validatableFunctionPackage,
validatableTransformFunction, true));
} else {
throw new IllegalArgumentException("Must specify Function, Source or Sink config");
}
Expand Down Expand Up @@ -472,7 +489,7 @@ private UserCodeClassLoader extractClassLoader(String userCodeFile, ComponentTyp
if (classLoader == null) {
if (userCodeFile != null && Utils.isFunctionPackageUrlSupported(userCodeFile)) {
File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
classLoader = FunctionCommon.getClassLoaderFromPackage(
classLoader = FunctionRuntimeCommon.getClassLoaderFromPackage(
componentType, className, file, narExtractionDirectory);
classLoaderCreated = true;
} else if (userCodeFile != null) {
Expand All @@ -494,7 +511,7 @@ private UserCodeClassLoader extractClassLoader(String userCodeFile, ComponentTyp
}
throw new RuntimeException(errorMsg + " (" + userCodeFile + ") does not exist");
}
classLoader = FunctionCommon.getClassLoaderFromPackage(
classLoader = FunctionRuntimeCommon.getClassLoaderFromPackage(
componentType, className, file, narExtractionDirectory);
classLoaderCreated = true;
} else {
Expand Down Expand Up @@ -713,7 +730,7 @@ private ClassLoader isBuiltInFunction(String functionType) throws IOException {
FunctionArchive function = functions.get(functionName);
if (function != null && function.getFunctionDefinition().getFunctionClass() != null) {
// Function type is a valid built-in type.
return function.getClassLoader();
return function.getFunctionPackage().getClassLoader();
} else {
return null;
}
Expand All @@ -727,7 +744,7 @@ private ClassLoader isBuiltInSource(String sourceType) throws IOException {
Connector connector = connectors.get(source);
if (connector != null && connector.getConnectorDefinition().getSourceClass() != null) {
// Source type is a valid built-in connector type.
return connector.getClassLoader();
return connector.getConnectorFunctionPackage().getClassLoader();
} else {
return null;
}
Expand All @@ -741,18 +758,18 @@ private ClassLoader isBuiltInSink(String sinkType) throws IOException {
Connector connector = connectors.get(sink);
if (connector != null && connector.getConnectorDefinition().getSinkClass() != null) {
// Sink type is a valid built-in connector type
return connector.getClassLoader();
return connector.getConnectorFunctionPackage().getClassLoader();
} else {
return null;
}
}

private TreeMap<String, FunctionArchive> getFunctions() throws IOException {
return FunctionUtils.searchForFunctions(functionsDir);
return FunctionUtils.searchForFunctions(functionsDir, narExtractionDirectory, true);
}

private TreeMap<String, Connector> getConnectors() throws IOException {
return ConnectorUtils.searchForConnectors(connectorsDir, narExtractionDirectory);
return ConnectorUtils.searchForConnectors(connectorsDir, narExtractionDirectory, true);
}

private SecretsProviderConfigurator getSecretsProviderConfigurator() {
Expand Down
Loading

0 comments on commit bbc6224

Please sign in to comment.