Skip to content

Commit

Permalink
Revert "[improve][fn] Add configuration for connector & functions pac…
Browse files Browse the repository at this point in the history
…kage url sources (apache#22184)"

This reverts commit 2d19267.
  • Loading branch information
mukesh-ctds committed Mar 5, 2024
1 parent 6d379b9 commit 8f595b8
Show file tree
Hide file tree
Showing 20 changed files with 55 additions and 282 deletions.
8 changes: 0 additions & 8 deletions conf/functions_worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -408,15 +408,7 @@ saslJaasServerRoleTokenSignerSecretPath:
########################

connectorsDirectory: ./connectors
# Whether to enable referencing connectors directory files by file url in connector (sink/source) creation
enableReferencingConnectorDirectoryFiles: true
# Regex patterns for enabling creation of connectors by referencing packages in matching http/https urls
additionalEnabledConnectorUrlPatterns: []
functionsDirectory: ./functions
# Whether to enable referencing functions directory files by file url in functions creation
enableReferencingFunctionsDirectoryFiles: true
# Regex patterns for enabling creation of functions by referencing packages in matching http/https urls
additionalEnabledFunctionsUrlPatterns: []

# Enables extended validation for connector config with fine-grain annotation based validation
# during submission. Classloading with either enableClassloadingOfExternalFiles or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.net.URL;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
Expand Down Expand Up @@ -268,11 +267,6 @@ private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration conf
workerConfig.setAuthorizationEnabled(config.isAuthorizationEnabled());
workerConfig.setAuthorizationProvider(config.getAuthorizationProvider());

List<String> urlPatterns =
List.of(getPulsarApiExamplesJar().getParentFile().toURI() + ".*", "http://127\\.0\\.0\\.1:.*");
workerConfig.setAdditionalEnabledConnectorUrlPatterns(urlPatterns);
workerConfig.setAdditionalEnabledFunctionsUrlPatterns(urlPatterns);

PulsarWorkerService workerService = new PulsarWorkerService();
return workerService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,6 @@ private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration conf
workerConfig.setAuthenticationEnabled(true);
workerConfig.setAuthorizationEnabled(true);

List<String> urlPatterns = List.of(getPulsarApiExamplesJar().getParentFile().toURI() + ".*");
workerConfig.setAdditionalEnabledConnectorUrlPatterns(urlPatterns);
workerConfig.setAdditionalEnabledFunctionsUrlPatterns(urlPatterns);

PulsarWorkerService workerService = new PulsarWorkerService();
return workerService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.assertNotNull;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
Expand All @@ -32,7 +32,6 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -154,12 +153,6 @@ void setup() throws Exception {
workerConfig.setUseTls(true);
workerConfig.setTlsEnableHostnameVerification(true);
workerConfig.setTlsAllowInsecureConnection(false);
File packagePath = new File(
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath()).getParentFile();
List<String> urlPatterns =
List.of(packagePath.toURI() + ".*");
workerConfig.setAdditionalEnabledConnectorUrlPatterns(urlPatterns);
workerConfig.setAdditionalEnabledFunctionsUrlPatterns(urlPatterns);
fnWorkerServices[i] = WorkerServiceLoader.load(workerConfig);

configurations[i] = config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -307,11 +306,6 @@ private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration conf
workerConfig.setAuthenticationEnabled(true);
workerConfig.setAuthorizationEnabled(true);

List<String> urlPatterns =
List.of(getPulsarApiExamplesJar().getParentFile().toURI() + ".*", "http://127\\.0\\.0\\.1:.*");
workerConfig.setAdditionalEnabledConnectorUrlPatterns(urlPatterns);
workerConfig.setAdditionalEnabledFunctionsUrlPatterns(urlPatterns);

PulsarWorkerService workerService = new PulsarWorkerService();
return workerService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,18 +264,6 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
doc = "The directory where nar packages are extractors"
)
private String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR;
@FieldContext(
category = CATEGORY_CONNECTORS,
doc = "Whether to enable referencing connectors directory files by file url in connector (sink/source) "
+ "creation. Default is true."
)
private Boolean enableReferencingConnectorDirectoryFiles = true;
@FieldContext(
category = CATEGORY_FUNCTIONS,
doc = "Regex patterns for enabling creation of connectors by referencing packages in matching http/https "
+ "urls."
)
private List<String> additionalEnabledConnectorUrlPatterns = new ArrayList<>();
@FieldContext(
category = CATEGORY_CONNECTORS,
doc = "Enables extended validation for connector config with fine-grain annotation based validation "
Expand All @@ -294,18 +282,6 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
doc = "The path to the location to locate builtin functions"
)
private String functionsDirectory = "./functions";
@FieldContext(
category = CATEGORY_FUNCTIONS,
doc = "Whether to enable referencing functions directory files by file url in functions creation. "
+ "Default is true."
)
private Boolean enableReferencingFunctionsDirectoryFiles = true;
@FieldContext(
category = CATEGORY_FUNCTIONS,
doc = "Regex patterns for enabling creation of functions by referencing packages in matching http/https "
+ "urls."
)
private List<String> additionalEnabledFunctionsUrlPatterns = new ArrayList<>();
@FieldContext(
category = CATEGORY_FUNC_METADATA_MNG,
doc = "The Pulsar topic used for storing function metadata"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,21 +83,18 @@ public class FunctionActioner {
private final ConnectorsManager connectorsManager;
private final FunctionsManager functionsManager;
private final PulsarAdmin pulsarAdmin;
private final PackageUrlValidator packageUrlValidator;

public FunctionActioner(WorkerConfig workerConfig,
RuntimeFactory runtimeFactory,
Namespace dlogNamespace,
ConnectorsManager connectorsManager,
FunctionsManager functionsManager, PulsarAdmin pulsarAdmin,
PackageUrlValidator packageUrlValidator) {
FunctionsManager functionsManager, PulsarAdmin pulsarAdmin) {
this.workerConfig = workerConfig;
this.runtimeFactory = runtimeFactory;
this.dlogNamespace = dlogNamespace;
this.connectorsManager = connectorsManager;
this.functionsManager = functionsManager;
this.pulsarAdmin = pulsarAdmin;
this.packageUrlValidator = packageUrlValidator;
}


Expand Down Expand Up @@ -155,9 +152,6 @@ private String getPackageFile(FunctionMetaData functionMetaData, FunctionDetails
boolean isPkgUrlProvided = isFunctionPackageUrlSupported(packagePath);
String packageFile;
if (isPkgUrlProvided && packagePath.startsWith(FILE)) {
if (!packageUrlValidator.isValidPackageUrl(componentType, packagePath)) {
throw new IllegalArgumentException("Package URL " + packagePath + " is not valid");
}
URL url = new URL(packagePath);
File pkgFile = new File(url.toURI());
packageFile = pkgFile.getAbsolutePath();
Expand All @@ -174,7 +168,7 @@ private String getPackageFile(FunctionMetaData functionMetaData, FunctionDetails
pkgDir,
new File(getDownloadFileName(functionMetaData.getFunctionDetails(),
pkgLocation)).getName());
downloadFile(pkgFile, isPkgUrlProvided, functionMetaData, instanceId, pkgLocation, componentType);
downloadFile(pkgFile, isPkgUrlProvided, functionMetaData, instanceId, pkgLocation);
packageFile = pkgFile.getAbsolutePath();
}
return packageFile;
Expand Down Expand Up @@ -233,8 +227,7 @@ InstanceConfig createInstanceConfig(FunctionDetails functionDetails, Function.Fu
}

private void downloadFile(File pkgFile, boolean isPkgUrlProvided, FunctionMetaData functionMetaData,
int instanceId, Function.PackageLocationMetaData pkgLocation,
FunctionDetails.ComponentType componentType)
int instanceId, Function.PackageLocationMetaData pkgLocation)
throws IOException, PulsarAdminException {

FunctionDetails details = functionMetaData.getFunctionDetails();
Expand All @@ -259,9 +252,6 @@ private void downloadFile(File pkgFile, boolean isPkgUrlProvided, FunctionMetaDa
downloadFromHttp ? pkgLocationPath : pkgLocation);

if (downloadFromHttp) {
if (!packageUrlValidator.isValidPackageUrl(componentType, pkgLocationPath)) {
throw new IllegalArgumentException("Package URL " + pkgLocationPath + " is not valid");
}
FunctionCommon.downloadFromHttpUrl(pkgLocationPath, tempPkgFile);
} else if (downloadFromPackageManagementService) {
getPulsarAdmin().packages().download(pkgLocationPath, tempPkgFile.getPath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,7 @@ public FunctionRuntimeManager(WorkerConfig workerConfig, PulsarWorkerService wor
functionAuthProvider, runtimeCustomizer);

this.functionActioner = new FunctionActioner(this.workerConfig, runtimeFactory,
dlogNamespace, connectorsManager, functionsManager, workerService.getBrokerAdmin(),
workerService.getPackageUrlValidator());
dlogNamespace, connectorsManager, functionsManager, workerService.getBrokerAdmin());

this.membershipManager = membershipManager;
this.functionMetaDataManager = functionMetaDataManager;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,7 @@ public interface PulsarClientCreator {
private Sinks<PulsarWorkerService> sinks;
private Sources<PulsarWorkerService> sources;
private Workers<PulsarWorkerService> workers;
@Getter
private PackageUrlValidator packageUrlValidator;

private final PulsarClientCreator clientCreator;

public PulsarWorkerService() {
Expand Down Expand Up @@ -197,7 +196,6 @@ public void init(WorkerConfig workerConfig,
this.sinks = new SinksImpl(() -> PulsarWorkerService.this);
this.sources = new SourcesImpl(() -> PulsarWorkerService.this);
this.workers = new WorkerImpl(() -> PulsarWorkerService.this);
this.packageUrlValidator = new PackageUrlValidator(workerConfig);
}

@Override
Expand Down
Loading

0 comments on commit 8f595b8

Please sign in to comment.