From 64a63bb2d198757b95b13bfc72090a30902a231b Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Tue, 9 Nov 2021 12:07:29 +0100 Subject: [PATCH] Pulsar Functions: detect .nar files and prevent spammy logs on functions boot (#13) (#12667) (cherry picked from commit 515a69fc6675322c455d606631cc47490756b453) --- .../apache/pulsar/common/nar/FileUtils.java | 20 +++++++++++++ .../instance/JavaInstanceRunnable.java | 1 + .../runtime/thread/ThreadRuntime.java | 28 +++++++++++++------ 3 files changed, 41 insertions(+), 8 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/FileUtils.java b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/FileUtils.java index cc677302ed802..0bfdb806165e9 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/FileUtils.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/FileUtils.java @@ -30,6 +30,9 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; +import java.util.zip.ZipEntry; +import java.util.zip.ZipFile; +import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; /** @@ -37,6 +40,7 @@ * operations. * */ +@Slf4j public class FileUtils { public static final long MILLIS_BETWEEN_ATTEMPTS = 50L; @@ -221,5 +225,21 @@ public static void sleepQuietly(final long millis) { /* do nothing */ } } + + public static boolean mayBeANarArchive(File jarFile) { + try (ZipFile zipFile = new ZipFile(jarFile);) { + ZipEntry entry = zipFile.getEntry("META-INF/bundled-dependencies"); + if (entry == null || !entry.isDirectory()) { + log.info("Jar file {} does not contain META-INF/bundled-dependencies, it is not a NAR file", jarFile); + return false; + } else { + log.info("Jar file {} contains META-INF/bundled-dependencies, it may be a NAR file", jarFile); + return true; + } + } catch (IOException err) { + log.info("Cannot safely detect if {} is a NAR archive", jarFile, err); + return true; + } + } } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index cfdcb08071846..bbc7ced0ca492 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -53,6 +53,7 @@ import org.apache.pulsar.common.functions.ProducerConfig; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.common.util.Reflections; +import org.apache.pulsar.common.nar.FileUtils; import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.api.StateStore; diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java index b6dd019140e76..be43bb0f29219 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java @@ -19,6 +19,7 @@ package org.apache.pulsar.functions.runtime.thread; +import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.util.Arrays; @@ -33,6 +34,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.common.nar.FileUtils; import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.instance.InstanceUtils; import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry; @@ -132,14 +134,24 @@ private static ClassLoader loadJars(String jarFile, return Thread.currentThread().getContextClassLoader(); } ClassLoader fnClassLoader; - try { - log.info("Load JAR: {}", jarFile); - // Let's first try to treat it as a nar archive - fnCache.registerFunctionInstanceWithArchive( - instanceConfig.getFunctionId(), - instanceConfig.getInstanceName(), - jarFile, narExtractionDirectory); - } catch (FileNotFoundException e) { + boolean loadedAsNar = false; + if (FileUtils.mayBeANarArchive(new File(jarFile))) { + try { + log.info("Trying Loading file as NAR file: {}", jarFile); + // Let's first try to treat it as a nar archive + fnCache.registerFunctionInstanceWithArchive( + instanceConfig.getFunctionId(), + instanceConfig.getInstanceName(), + jarFile, narExtractionDirectory); + loadedAsNar = true; + } catch (FileNotFoundException e) { + // this is usually like + // java.io.FileNotFoundException: /tmp/pulsar-nar/xxx.jar-unpacked/xxxxx/META-INF/MANIFEST.MF' + log.error("The file {} does not look like a .nar file", jarFile, e.toString()); + } + } + if (!loadedAsNar) { + log.info("Load file as simple JAR file: {}", jarFile); // create the function class loader fnCache.registerFunctionInstance( instanceConfig.getFunctionId(),