diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml index 3871c74a88778..8c62536971990 100644 --- a/conf/functions_worker.yml +++ b/conf/functions_worker.yml @@ -408,15 +408,7 @@ saslJaasServerRoleTokenSignerSecretPath: ######################## connectorsDirectory: ./connectors -# Whether to enable referencing connectors directory files by file url in connector (sink/source) creation -enableReferencingConnectorDirectoryFiles: true -# Regex patterns for enabling creation of connectors by referencing packages in matching http/https urls -additionalEnabledConnectorUrlPatterns: [] functionsDirectory: ./functions -# Whether to enable referencing functions directory files by file url in functions creation -enableReferencingFunctionsDirectoryFiles: true -# Regex patterns for enabling creation of functions by referencing packages in matching http/https urls -additionalEnabledFunctionsUrlPatterns: [] # Enables extended validation for connector config with fine-grain annotation based validation # during submission. Classloading with either enableClassloadingOfExternalFiles or diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java index cbf2f28b0b50b..107aedd076691 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java @@ -34,7 +34,6 @@ import java.net.URL; import java.util.Collections; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Properties; @@ -268,11 +267,6 @@ private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration conf workerConfig.setAuthorizationEnabled(config.isAuthorizationEnabled()); workerConfig.setAuthorizationProvider(config.getAuthorizationProvider()); - List urlPatterns = - List.of(getPulsarApiExamplesJar().getParentFile().toURI() + ".*", "http://127\\.0\\.0\\.1:.*"); - workerConfig.setAdditionalEnabledConnectorUrlPatterns(urlPatterns); - workerConfig.setAdditionalEnabledFunctionsUrlPatterns(urlPatterns); - PulsarWorkerService workerService = new PulsarWorkerService(); return workerService; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java index 72e6a3766aeee..7bcf1dec871e0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java @@ -268,10 +268,6 @@ private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration conf workerConfig.setAuthenticationEnabled(true); workerConfig.setAuthorizationEnabled(true); - List urlPatterns = List.of(getPulsarApiExamplesJar().getParentFile().toURI() + ".*"); - workerConfig.setAdditionalEnabledConnectorUrlPatterns(urlPatterns); - workerConfig.setAdditionalEnabledFunctionsUrlPatterns(urlPatterns); - PulsarWorkerService workerService = new PulsarWorkerService(); return workerService; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java index 3508cf0bfc7e6..9882b15450e40 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java @@ -20,8 +20,8 @@ import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.assertNotNull; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Sets; @@ -32,7 +32,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -154,12 +153,6 @@ void setup() throws Exception { workerConfig.setUseTls(true); workerConfig.setTlsEnableHostnameVerification(true); workerConfig.setTlsAllowInsecureConnection(false); - File packagePath = new File( - PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath()).getParentFile(); - List urlPatterns = - List.of(packagePath.toURI() + ".*"); - workerConfig.setAdditionalEnabledConnectorUrlPatterns(urlPatterns); - workerConfig.setAdditionalEnabledFunctionsUrlPatterns(urlPatterns); fnWorkerServices[i] = WorkerServiceLoader.load(workerConfig); configurations[i] = config; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java index 3c0dd0822b7dc..f968315a7124c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java @@ -35,7 +35,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -307,11 +306,6 @@ private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration conf workerConfig.setAuthenticationEnabled(true); workerConfig.setAuthorizationEnabled(true); - List urlPatterns = - List.of(getPulsarApiExamplesJar().getParentFile().toURI() + ".*", "http://127\\.0\\.0\\.1:.*"); - workerConfig.setAdditionalEnabledConnectorUrlPatterns(urlPatterns); - workerConfig.setAdditionalEnabledFunctionsUrlPatterns(urlPatterns); - PulsarWorkerService workerService = new PulsarWorkerService(); return workerService; } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java index ec0e620d0ae8b..2d9698103fa0f 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java @@ -264,18 +264,6 @@ public class WorkerConfig implements Serializable, PulsarConfiguration { doc = "The directory where nar packages are extractors" ) private String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR; - @FieldContext( - category = CATEGORY_CONNECTORS, - doc = "Whether to enable referencing connectors directory files by file url in connector (sink/source) " - + "creation. Default is true." - ) - private Boolean enableReferencingConnectorDirectoryFiles = true; - @FieldContext( - category = CATEGORY_FUNCTIONS, - doc = "Regex patterns for enabling creation of connectors by referencing packages in matching http/https " - + "urls." - ) - private List additionalEnabledConnectorUrlPatterns = new ArrayList<>(); @FieldContext( category = CATEGORY_CONNECTORS, doc = "Enables extended validation for connector config with fine-grain annotation based validation " @@ -294,18 +282,6 @@ public class WorkerConfig implements Serializable, PulsarConfiguration { doc = "The path to the location to locate builtin functions" ) private String functionsDirectory = "./functions"; - @FieldContext( - category = CATEGORY_FUNCTIONS, - doc = "Whether to enable referencing functions directory files by file url in functions creation. " - + "Default is true." - ) - private Boolean enableReferencingFunctionsDirectoryFiles = true; - @FieldContext( - category = CATEGORY_FUNCTIONS, - doc = "Regex patterns for enabling creation of functions by referencing packages in matching http/https " - + "urls." - ) - private List additionalEnabledFunctionsUrlPatterns = new ArrayList<>(); @FieldContext( category = CATEGORY_FUNC_METADATA_MNG, doc = "The Pulsar topic used for storing function metadata" diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java index 389051fce4217..250a7cc4c7bd4 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java @@ -83,21 +83,18 @@ public class FunctionActioner { private final ConnectorsManager connectorsManager; private final FunctionsManager functionsManager; private final PulsarAdmin pulsarAdmin; - private final PackageUrlValidator packageUrlValidator; public FunctionActioner(WorkerConfig workerConfig, RuntimeFactory runtimeFactory, Namespace dlogNamespace, ConnectorsManager connectorsManager, - FunctionsManager functionsManager, PulsarAdmin pulsarAdmin, - PackageUrlValidator packageUrlValidator) { + FunctionsManager functionsManager, PulsarAdmin pulsarAdmin) { this.workerConfig = workerConfig; this.runtimeFactory = runtimeFactory; this.dlogNamespace = dlogNamespace; this.connectorsManager = connectorsManager; this.functionsManager = functionsManager; this.pulsarAdmin = pulsarAdmin; - this.packageUrlValidator = packageUrlValidator; } @@ -155,9 +152,6 @@ private String getPackageFile(FunctionMetaData functionMetaData, FunctionDetails boolean isPkgUrlProvided = isFunctionPackageUrlSupported(packagePath); String packageFile; if (isPkgUrlProvided && packagePath.startsWith(FILE)) { - if (!packageUrlValidator.isValidPackageUrl(componentType, packagePath)) { - throw new IllegalArgumentException("Package URL " + packagePath + " is not valid"); - } URL url = new URL(packagePath); File pkgFile = new File(url.toURI()); packageFile = pkgFile.getAbsolutePath(); @@ -174,7 +168,7 @@ private String getPackageFile(FunctionMetaData functionMetaData, FunctionDetails pkgDir, new File(getDownloadFileName(functionMetaData.getFunctionDetails(), pkgLocation)).getName()); - downloadFile(pkgFile, isPkgUrlProvided, functionMetaData, instanceId, pkgLocation, componentType); + downloadFile(pkgFile, isPkgUrlProvided, functionMetaData, instanceId, pkgLocation); packageFile = pkgFile.getAbsolutePath(); } return packageFile; @@ -233,8 +227,7 @@ InstanceConfig createInstanceConfig(FunctionDetails functionDetails, Function.Fu } private void downloadFile(File pkgFile, boolean isPkgUrlProvided, FunctionMetaData functionMetaData, - int instanceId, Function.PackageLocationMetaData pkgLocation, - FunctionDetails.ComponentType componentType) + int instanceId, Function.PackageLocationMetaData pkgLocation) throws IOException, PulsarAdminException { FunctionDetails details = functionMetaData.getFunctionDetails(); @@ -259,9 +252,6 @@ private void downloadFile(File pkgFile, boolean isPkgUrlProvided, FunctionMetaDa downloadFromHttp ? pkgLocationPath : pkgLocation); if (downloadFromHttp) { - if (!packageUrlValidator.isValidPackageUrl(componentType, pkgLocationPath)) { - throw new IllegalArgumentException("Package URL " + pkgLocationPath + " is not valid"); - } FunctionCommon.downloadFromHttpUrl(pkgLocationPath, tempPkgFile); } else if (downloadFromPackageManagementService) { getPulsarAdmin().packages().download(pkgLocationPath, tempPkgFile.getPath()); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index b6e2bbb1ca0f8..8e6725e93af91 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -220,8 +220,7 @@ public FunctionRuntimeManager(WorkerConfig workerConfig, PulsarWorkerService wor functionAuthProvider, runtimeCustomizer); this.functionActioner = new FunctionActioner(this.workerConfig, runtimeFactory, - dlogNamespace, connectorsManager, functionsManager, workerService.getBrokerAdmin(), - workerService.getPackageUrlValidator()); + dlogNamespace, connectorsManager, functionsManager, workerService.getBrokerAdmin()); this.membershipManager = membershipManager; this.functionMetaDataManager = functionMetaDataManager; diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PackageUrlValidator.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PackageUrlValidator.java deleted file mode 100644 index 2a8fe8dddb153..0000000000000 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PackageUrlValidator.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.functions.worker; - -import java.net.URI; -import java.nio.file.Path; -import java.util.Collections; -import java.util.List; -import java.util.regex.Pattern; -import java.util.stream.Collectors; -import org.apache.pulsar.functions.proto.Function; - -/** - * Validates package URLs for functions and connectors. - * Validates that the package URL is either a file in the connectors or functions directory - * when referencing connector or function files is enabled, or matches one of the additional url patterns. - */ -public class PackageUrlValidator { - private final Path connectionsDirectory; - private final Path functionsDirectory; - private final List additionalConnectionsPatterns; - private final List additionalFunctionsPatterns; - - public PackageUrlValidator(WorkerConfig workerConfig) { - this.connectionsDirectory = resolveDirectory(workerConfig.getEnableReferencingConnectorDirectoryFiles(), - workerConfig.getConnectorsDirectory()); - this.functionsDirectory = resolveDirectory(workerConfig.getEnableReferencingFunctionsDirectoryFiles(), - workerConfig.getFunctionsDirectory()); - this.additionalConnectionsPatterns = - compilePatterns(workerConfig.getAdditionalEnabledConnectorUrlPatterns()); - this.additionalFunctionsPatterns = - compilePatterns(workerConfig.getAdditionalEnabledFunctionsUrlPatterns()); - } - - private static Path resolveDirectory(Boolean enabled, String directory) { - return enabled != null && enabled - ? Path.of(directory).normalize().toAbsolutePath() : null; - } - - private static List compilePatterns(List additionalPatterns) { - return additionalPatterns != null ? additionalPatterns.stream().map(Pattern::compile).collect( - Collectors.toList()) : Collections.emptyList(); - } - - boolean isValidFunctionsPackageUrl(URI functionPkgUrl) { - return doesMatch(functionPkgUrl, functionsDirectory, additionalFunctionsPatterns); - } - - boolean isValidConnectionsPackageUrl(URI functionPkgUrl) { - return doesMatch(functionPkgUrl, connectionsDirectory, additionalConnectionsPatterns); - } - - private boolean doesMatch(URI functionPkgUrl, Path directory, List patterns) { - if (directory != null && "file".equals(functionPkgUrl.getScheme())) { - Path filePath = Path.of(functionPkgUrl.getPath()).normalize().toAbsolutePath(); - if (filePath.startsWith(directory)) { - return true; - } - } - String functionPkgUrlString = functionPkgUrl.normalize().toString(); - for (Pattern pattern : patterns) { - if (pattern.matcher(functionPkgUrlString).matches()) { - return true; - } - } - return false; - } - - public boolean isValidPackageUrl(Function.FunctionDetails.ComponentType componentType, String functionPkgUrl) { - URI uri = URI.create(functionPkgUrl); - if (componentType == null) { - // if component type is not specified, we need to check both functions and connections - return isValidFunctionsPackageUrl(uri) || isValidConnectionsPackageUrl(uri); - } - switch (componentType) { - case FUNCTION: - return isValidFunctionsPackageUrl(uri); - case SINK: - case SOURCE: - return isValidConnectionsPackageUrl(uri); - default: - throw new IllegalArgumentException("Unknown component type: " + componentType); - } - } -} diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java index 84b943e5671ac..f9f2738828be7 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java @@ -119,8 +119,7 @@ public interface PulsarClientCreator { private Sinks sinks; private Sources sources; private Workers workers; - @Getter - private PackageUrlValidator packageUrlValidator; + private final PulsarClientCreator clientCreator; public PulsarWorkerService() { @@ -197,7 +196,6 @@ public void init(WorkerConfig workerConfig, this.sinks = new SinksImpl(() -> PulsarWorkerService.this); this.sources = new SourcesImpl(() -> PulsarWorkerService.this); this.workers = new WorkerImpl(() -> PulsarWorkerService.this); - this.packageUrlValidator = new PackageUrlValidator(workerConfig); } @Override diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java index 6d07e5870917a..5e105f7057e33 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java @@ -1376,17 +1376,11 @@ private StreamingOutput getStreamingOutput(String pkgPath) { private StreamingOutput getStreamingOutput(String pkgPath, FunctionDetails.ComponentType componentType) { return output -> { if (pkgPath.startsWith(Utils.HTTP)) { - if (!worker().getPackageUrlValidator().isValidPackageUrl(componentType, pkgPath)) { - throw new IllegalArgumentException("Invalid package url: " + pkgPath); - } URL url = URI.create(pkgPath).toURL(); try (InputStream inputStream = url.openStream()) { IOUtils.copy(inputStream, output); } } else if (pkgPath.startsWith(Utils.FILE)) { - if (!worker().getPackageUrlValidator().isValidPackageUrl(componentType, pkgPath)) { - throw new IllegalArgumentException("Invalid package url: " + pkgPath); - } URI url = URI.create(pkgPath); File file = new File(url.getPath()); Files.copy(file.toPath(), output); @@ -1810,17 +1804,12 @@ static File downloadPackageFile(PulsarWorkerService worker, String packageName) return file; } - protected File getPackageFile(FunctionDetails.ComponentType componentType, String functionPkgUrl, - String existingPackagePath, InputStream uploadedInputStream) + protected File getPackageFile(String functionPkgUrl, String existingPackagePath, InputStream uploadedInputStream) throws IOException, PulsarAdminException { File componentPackageFile = null; if (isNotBlank(functionPkgUrl)) { - componentPackageFile = getPackageFile(componentType, functionPkgUrl); + componentPackageFile = getPackageFile(functionPkgUrl); } else if (existingPackagePath.startsWith(Utils.FILE) || existingPackagePath.startsWith(Utils.HTTP)) { - if (!worker().getPackageUrlValidator().isValidPackageUrl(componentType, functionPkgUrl)) { - throw new IllegalArgumentException("Function Package url is not valid." - + "supported url (http/https/file)"); - } try { componentPackageFile = FunctionCommon.extractFileFromPkgURL(existingPackagePath); } catch (Exception e) { @@ -1829,7 +1818,7 @@ protected File getPackageFile(FunctionDetails.ComponentType componentType, Strin ComponentTypeUtils.toString(componentType), functionPkgUrl)); } } else if (Utils.hasPackageTypePrefix(existingPackagePath)) { - componentPackageFile = getPackageFile(componentType, existingPackagePath); + componentPackageFile = getPackageFile(existingPackagePath); } else if (uploadedInputStream != null) { componentPackageFile = WorkerUtils.dumpToTmpFile(uploadedInputStream); } else if (!existingPackagePath.startsWith(Utils.BUILTIN)) { @@ -1847,16 +1836,15 @@ protected File getPackageFile(FunctionDetails.ComponentType componentType, Strin return componentPackageFile; } - protected File getPackageFile(FunctionDetails.ComponentType componentType, String functionPkgUrl) - throws IOException, PulsarAdminException { + protected File downloadPackageFile(String packageName) throws IOException, PulsarAdminException { + return downloadPackageFile(worker(), packageName); + } + + protected File getPackageFile(String functionPkgUrl) throws IOException, PulsarAdminException { if (Utils.hasPackageTypePrefix(functionPkgUrl)) { - if (!worker().getWorkerConfig().isFunctionsWorkerEnablePackageManagement()) { - throw new IllegalStateException("Function Package management service is disabled. " - + "Please enable it to use " + functionPkgUrl); - } - return downloadPackageFile(worker(), functionPkgUrl); + return downloadPackageFile(functionPkgUrl); } else { - if (!worker().getPackageUrlValidator().isValidPackageUrl(componentType, functionPkgUrl)) { + if (!Utils.isFunctionPackageUrlSupported(functionPkgUrl)) { throw new IllegalArgumentException("Function Package url is not valid." + "supported url (http/https/file)"); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index a075d3e18a0b3..6b81d2c4918a6 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -144,7 +144,7 @@ public void registerFunction(final String tenant, // validate parameters try { if (isNotBlank(functionPkgUrl)) { - componentPackageFile = getPackageFile(componentType, functionPkgUrl); + componentPackageFile = getPackageFile(functionPkgUrl); functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, functionConfig, componentPackageFile); } else { @@ -305,7 +305,6 @@ public void updateFunction(final String tenant, // validate parameters try { componentPackageFile = getPackageFile( - componentType, functionPkgUrl, existingComponent.getPackageLocation().getPackagePath(), uploadedInputStream); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java index 6b8b41e5a8e5b..51d1333a79c36 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java @@ -143,7 +143,7 @@ public void registerSink(final String tenant, // validate parameters try { if (isNotBlank(sinkPkgUrl)) { - componentPackageFile = getPackageFile(componentType, sinkPkgUrl); + componentPackageFile = getPackageFile(sinkPkgUrl); functionDetails = validateUpdateRequestParams(tenant, namespace, sinkName, sinkConfig, componentPackageFile); } else { @@ -310,7 +310,6 @@ public void updateSink(final String tenant, // validate parameters try { componentPackageFile = getPackageFile( - componentType, sinkPkgUrl, existingComponent.getPackageLocation().getPackagePath(), uploadedInputStream); @@ -422,8 +421,7 @@ private void setTransformFunctionPackageLocation(Function.FunctionMetaData.Build try { String builtin = functionDetails.getBuiltin(); if (isBlank(builtin)) { - functionPackageFile = - getPackageFile(Function.FunctionDetails.ComponentType.FUNCTION, transformFunction); + functionPackageFile = getPackageFile(transformFunction); } Function.PackageLocationMetaData.Builder functionPackageLocation = getFunctionPackageLocation(functionMetaDataBuilder.build(), @@ -746,8 +744,7 @@ private Function.FunctionDetails validateUpdateRequestParams(final String tenant transformFunctionPackage = getBuiltinFunctionPackage(sinkConfig.getTransformFunction()); if (transformFunctionPackage == null) { - File functionPackageFile = getPackageFile(Function.FunctionDetails.ComponentType.FUNCTION, - sinkConfig.getTransformFunction()); + File functionPackageFile = getPackageFile(sinkConfig.getTransformFunction()); transformFunctionPackage = new FunctionFilePackage(functionPackageFile, workerConfig.getNarExtractionDirectory(), workerConfig.getEnableClassloadingOfExternalFiles(), ConnectorDefinition.class); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java index 5191306146951..dea69698dd28d 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java @@ -143,7 +143,7 @@ public void registerSource(final String tenant, // validate parameters try { if (isPkgUrlProvided) { - componentPackageFile = getPackageFile(componentType, sourcePkgUrl); + componentPackageFile = getPackageFile(sourcePkgUrl); functionDetails = validateUpdateRequestParams(tenant, namespace, sourceName, sourceConfig, componentPackageFile); } else { @@ -304,7 +304,6 @@ public void updateSource(final String tenant, // validate parameters try { componentPackageFile = getPackageFile( - componentType, sourcePkgUrl, existingComponent.getPackageLocation().getPackagePath(), uploadedInputStream); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java index ac5ca617ea43b..4e4c3d2f234aa 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java @@ -30,7 +30,6 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNull; import static org.testng.AssertJUnit.fail; -import java.util.List; import java.util.Map; import java.util.Optional; import org.apache.distributedlog.api.namespace.Namespace; @@ -78,8 +77,7 @@ public void testStartFunctionWithDLNamespace() throws Exception { @SuppressWarnings("resource") FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace, - new ConnectorsManager(workerConfig), new FunctionsManager(workerConfig), mock(PulsarAdmin.class), - mock(PackageUrlValidator.class)); + new ConnectorsManager(workerConfig), new FunctionsManager(workerConfig), mock(PulsarAdmin.class)); Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder() .setFunctionDetails(Function.FunctionDetails.newBuilder().setTenant("test-tenant") .setNamespace("test-namespace").setName("func-1")) @@ -111,8 +109,6 @@ public void testStartFunctionWithPkgUrl() throws Exception { workerConfig.setPulsarServiceUrl("pulsar://localhost:6650"); workerConfig.setStateStorageServiceUrl("foo"); workerConfig.setFunctionAssignmentTopicName("assignments"); - workerConfig.setAdditionalEnabledFunctionsUrlPatterns(List.of("file:///user/.*", "http://invalid/.*")); - workerConfig.setAdditionalEnabledConnectorUrlPatterns(List.of("file:///user/.*", "http://invalid/.*")); String downloadDir = this.getClass().getProtectionDomain().getCodeSource().getLocation().getPath(); workerConfig.setDownloadDirectory(downloadDir); @@ -126,12 +122,11 @@ public void testStartFunctionWithPkgUrl() throws Exception { @SuppressWarnings("resource") FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace, - new ConnectorsManager(workerConfig), new FunctionsManager(workerConfig), mock(PulsarAdmin.class), - new PackageUrlValidator(workerConfig)); + new ConnectorsManager(workerConfig), new FunctionsManager(workerConfig), mock(PulsarAdmin.class)); // (1) test with file url. functionActioner should be able to consider file-url and it should be able to call // RuntimeSpawner - String pkgPathLocation = FILE + ":///user/my-file.jar"; + String pkgPathLocation = FILE + ":/user/my-file.jar"; startFunction(actioner, pkgPathLocation, pkgPathLocation); verify(runtime, times(1)).start(); @@ -199,8 +194,7 @@ public void testFunctionAuthDisabled() throws Exception { @SuppressWarnings("resource") FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace, - new ConnectorsManager(workerConfig), new FunctionsManager(workerConfig), mock(PulsarAdmin.class), - mock(PackageUrlValidator.class)); + new ConnectorsManager(workerConfig), new FunctionsManager(workerConfig), mock(PulsarAdmin.class)); String pkgPathLocation = "http://invalid/my-file.jar"; @@ -263,8 +257,7 @@ public void testStartFunctionWithPackageUrl() throws Exception { @SuppressWarnings("resource") FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace, - new ConnectorsManager(workerConfig), new FunctionsManager(workerConfig), pulsarAdmin, - mock(PackageUrlValidator.class)); + new ConnectorsManager(workerConfig), new FunctionsManager(workerConfig), pulsarAdmin); // (1) test with file url. functionActioner should be able to consider file-url and it should be able to call // RuntimeSpawner diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java index 7b57a1c896539..bc56b1766d39d 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java @@ -720,7 +720,7 @@ public void testExternallyManagedRuntimeUpdate() throws Exception { FunctionActioner functionActioner = spy(new FunctionActioner( workerConfig, - kubernetesRuntimeFactory, null, null, null, null, workerService.getPackageUrlValidator())); + kubernetesRuntimeFactory, null, null, null, null)); try (final MockedStatic runtimeFactoryMockedStatic = Mockito .mockStatic(RuntimeFactory.class);) { diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/AbstractFunctionApiResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/AbstractFunctionApiResourceTest.java index 388331ce6f241..5845ff3afd9ac 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/AbstractFunctionApiResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/AbstractFunctionApiResourceTest.java @@ -29,7 +29,6 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; -import java.lang.reflect.Method; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -60,12 +59,6 @@ import org.testng.annotations.Test; public abstract class AbstractFunctionApiResourceTest extends AbstractFunctionsResourceTest { - @Override - protected void customizeWorkerConfig(WorkerConfig workerConfig, Method method) { - if (method.getName().contains("Upload")) { - workerConfig.setFunctionsWorkerEnablePackageManagement(false); - } - } @Test public void testListFunctionsSuccess() { diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/AbstractFunctionsResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/AbstractFunctionsResourceTest.java index 51ca4c83f9e02..4cc4ed0b09819 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/AbstractFunctionsResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/AbstractFunctionsResourceTest.java @@ -28,7 +28,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.UncheckedIOException; -import java.lang.reflect.Method; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -64,7 +63,6 @@ import org.apache.pulsar.functions.worker.FunctionRuntimeManager; import org.apache.pulsar.functions.worker.FunctionsManager; import org.apache.pulsar.functions.worker.LeaderService; -import org.apache.pulsar.functions.worker.PackageUrlValidator; import org.apache.pulsar.functions.worker.PulsarWorkerService; import org.apache.pulsar.functions.worker.WorkerConfig; import org.apache.pulsar.functions.worker.WorkerUtils; @@ -143,7 +141,7 @@ public static File getPulsarApiExamplesNar() { } @BeforeMethod - public final void setup(Method method) throws Exception { + public final void setup() throws Exception { this.mockedManager = mock(FunctionMetaDataManager.class); this.mockedFunctionRunTimeManager = mock(FunctionRuntimeManager.class); this.mockedRuntimeFactory = mock(RuntimeFactory.class); @@ -183,33 +181,21 @@ public final void setup(Method method) throws Exception { }).when(mockedPackages).download(any(), any()); // worker config - List urlPatterns = - List.of("http://localhost.*", "file:.*", "https://repo1.maven.org/maven2/org/apache/pulsar/.*"); WorkerConfig workerConfig = new WorkerConfig() .setWorkerId("test") .setWorkerPort(8080) .setFunctionMetadataTopicName("pulsar/functions") .setNumFunctionPackageReplicas(3) - .setPulsarServiceUrl("pulsar://localhost:6650/") - .setAdditionalEnabledFunctionsUrlPatterns(urlPatterns) - .setAdditionalEnabledConnectorUrlPatterns(urlPatterns) - .setFunctionsWorkerEnablePackageManagement(true); - customizeWorkerConfig(workerConfig, method); + .setPulsarServiceUrl("pulsar://localhost:6650/"); tempDirectory = PulsarFunctionTestTemporaryDirectory.create(getClass().getSimpleName()); tempDirectory.useTemporaryDirectoriesForWorkerConfig(workerConfig); when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig); when(mockedWorkerService.getFunctionsManager()).thenReturn(functionsManager); when(mockedWorkerService.getConnectorsManager()).thenReturn(connectorsManager); - PackageUrlValidator packageUrlValidator = new PackageUrlValidator(workerConfig); - when(mockedWorkerService.getPackageUrlValidator()).thenReturn(packageUrlValidator); doSetup(); } - protected void customizeWorkerConfig(WorkerConfig workerConfig, Method method) { - - } - protected File getDefaultNarFile() { return getPulsarIOTwitterNar(); } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java index c6c6303e48007..b9833380d7087 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java @@ -36,7 +36,6 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; -import java.lang.reflect.Method; import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; @@ -59,7 +58,6 @@ import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.Function.FunctionMetaData; import org.apache.pulsar.functions.utils.SinkConfigUtils; -import org.apache.pulsar.functions.worker.WorkerConfig; import org.apache.pulsar.functions.worker.WorkerUtils; import org.apache.pulsar.functions.worker.rest.api.SinksImpl; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; @@ -82,12 +80,6 @@ protected void doSetup() { this.resource = spy(new SinksImpl(() -> mockedWorkerService)); } - @Override - protected void customizeWorkerConfig(WorkerConfig workerConfig, Method method) { - if (method.getName().contains("Upload") || method.getName().contains("BKPackage")) { - workerConfig.setFunctionsWorkerEnablePackageManagement(false); - } - } @Override protected Function.FunctionDetails.ComponentType getComponentType() { return Function.FunctionDetails.ComponentType.SINK; @@ -1494,15 +1486,17 @@ public void testRegisterSinkSuccessK8sNoUpload() throws Exception { SinkConfig sinkConfig = createDefaultSinkConfig(); sinkConfig.setArchive("builtin://cassandra"); - resource.registerSink( - tenant, - namespace, - sink, - null, - mockedFormData, - null, - sinkConfig, - null); + try (FileInputStream inputStream = new FileInputStream(getPulsarIOCassandraNar())) { + resource.registerSink( + tenant, + namespace, + sink, + inputStream, + mockedFormData, + null, + sinkConfig, + null); + } } /* @@ -1532,19 +1526,21 @@ public void testRegisterSinkSuccessK8sWithUpload() throws Exception { SinkConfig sinkConfig = createDefaultSinkConfig(); sinkConfig.setArchive("builtin://cassandra"); - try { - resource.registerSink( - tenant, - namespace, - sink, - null, - mockedFormData, - null, - sinkConfig, - null); - Assert.fail(); - } catch (RuntimeException e) { - Assert.assertEquals(e.getMessage(), injectedErrMsg); + try (FileInputStream inputStream = new FileInputStream(getPulsarIOCassandraNar())) { + try { + resource.registerSink( + tenant, + namespace, + sink, + inputStream, + mockedFormData, + null, + sinkConfig, + null); + Assert.fail(); + } catch (RuntimeException e) { + Assert.assertEquals(e.getMessage(), injectedErrMsg); + } } } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java index f02acbd3663bf..c7e69484d3019 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java @@ -34,7 +34,6 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; -import java.lang.reflect.Method; import java.util.LinkedList; import java.util.List; import javax.ws.rs.core.Response; @@ -58,7 +57,6 @@ import org.apache.pulsar.functions.source.TopicSchema; import org.apache.pulsar.functions.utils.SourceConfigUtils; import org.apache.pulsar.functions.utils.io.ConnectorUtils; -import org.apache.pulsar.functions.worker.WorkerConfig; import org.apache.pulsar.functions.worker.WorkerUtils; import org.apache.pulsar.functions.worker.rest.api.SourcesImpl; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; @@ -82,13 +80,6 @@ protected void doSetup() { this.resource = spy(new SourcesImpl(() -> mockedWorkerService)); } - @Override - protected void customizeWorkerConfig(WorkerConfig workerConfig, Method method) { - if (method.getName().endsWith("UploadFailure") || method.getName().contains("BKPackage")) { - workerConfig.setFunctionsWorkerEnablePackageManagement(false); - } - } - @Override protected FunctionDetails.ComponentType getComponentType() { return FunctionDetails.ComponentType.SOURCE;