Skip to content

Commit

Permalink
[improve][fn] Add configuration for connector & functions package url…
Browse files Browse the repository at this point in the history
… sources (apache#22184)
  • Loading branch information
lhotari authored Mar 4, 2024
1 parent 8c7c978 commit 207335a
Show file tree
Hide file tree
Showing 20 changed files with 282 additions and 55 deletions.
8 changes: 8 additions & 0 deletions conf/functions_worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,15 @@ saslJaasServerRoleTokenSignerSecretPath:
########################

connectorsDirectory: ./connectors
# Whether to enable referencing connectors directory files by file url in connector (sink/source) creation
enableReferencingConnectorDirectoryFiles: true
# Regex patterns for enabling creation of connectors by referencing packages in matching http/https urls
additionalEnabledConnectorUrlPatterns: []
functionsDirectory: ./functions
# Whether to enable referencing functions directory files by file url in functions creation
enableReferencingFunctionsDirectoryFiles: true
# Regex patterns for enabling creation of functions by referencing packages in matching http/https urls
additionalEnabledFunctionsUrlPatterns: []

# Enables extended validation for connector config with fine-grain annotation based validation
# during submission. Classloading with either enableClassloadingOfExternalFiles or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.net.URL;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
Expand Down Expand Up @@ -267,6 +268,11 @@ private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration conf
workerConfig.setAuthorizationEnabled(config.isAuthorizationEnabled());
workerConfig.setAuthorizationProvider(config.getAuthorizationProvider());

List<String> urlPatterns =
List.of(getPulsarApiExamplesJar().getParentFile().toURI() + ".*", "http://127\\.0\\.0\\.1:.*");
workerConfig.setAdditionalEnabledConnectorUrlPatterns(urlPatterns);
workerConfig.setAdditionalEnabledFunctionsUrlPatterns(urlPatterns);

PulsarWorkerService workerService = new PulsarWorkerService();
return workerService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,10 @@ private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration conf
workerConfig.setAuthenticationEnabled(true);
workerConfig.setAuthorizationEnabled(true);

List<String> urlPatterns = List.of(getPulsarApiExamplesJar().getParentFile().toURI() + ".*");
workerConfig.setAdditionalEnabledConnectorUrlPatterns(urlPatterns);
workerConfig.setAdditionalEnabledFunctionsUrlPatterns(urlPatterns);

PulsarWorkerService workerService = new PulsarWorkerService();
return workerService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
Expand All @@ -32,6 +32,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -153,6 +154,12 @@ void setup() throws Exception {
workerConfig.setUseTls(true);
workerConfig.setTlsEnableHostnameVerification(true);
workerConfig.setTlsAllowInsecureConnection(false);
File packagePath = new File(
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath()).getParentFile();
List<String> urlPatterns =
List.of(packagePath.toURI() + ".*");
workerConfig.setAdditionalEnabledConnectorUrlPatterns(urlPatterns);
workerConfig.setAdditionalEnabledFunctionsUrlPatterns(urlPatterns);
fnWorkerServices[i] = WorkerServiceLoader.load(workerConfig);

configurations[i] = config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -306,6 +307,11 @@ private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration conf
workerConfig.setAuthenticationEnabled(true);
workerConfig.setAuthorizationEnabled(true);

List<String> urlPatterns =
List.of(getPulsarApiExamplesJar().getParentFile().toURI() + ".*", "http://127\\.0\\.0\\.1:.*");
workerConfig.setAdditionalEnabledConnectorUrlPatterns(urlPatterns);
workerConfig.setAdditionalEnabledFunctionsUrlPatterns(urlPatterns);

PulsarWorkerService workerService = new PulsarWorkerService();
return workerService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,18 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
doc = "The directory where nar packages are extractors"
)
private String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR;
@FieldContext(
category = CATEGORY_CONNECTORS,
doc = "Whether to enable referencing connectors directory files by file url in connector (sink/source) "
+ "creation. Default is true."
)
private Boolean enableReferencingConnectorDirectoryFiles = true;
@FieldContext(
category = CATEGORY_FUNCTIONS,
doc = "Regex patterns for enabling creation of connectors by referencing packages in matching http/https "
+ "urls."
)
private List<String> additionalEnabledConnectorUrlPatterns = new ArrayList<>();
@FieldContext(
category = CATEGORY_CONNECTORS,
doc = "Enables extended validation for connector config with fine-grain annotation based validation "
Expand All @@ -282,6 +294,18 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
doc = "The path to the location to locate builtin functions"
)
private String functionsDirectory = "./functions";
@FieldContext(
category = CATEGORY_FUNCTIONS,
doc = "Whether to enable referencing functions directory files by file url in functions creation. "
+ "Default is true."
)
private Boolean enableReferencingFunctionsDirectoryFiles = true;
@FieldContext(
category = CATEGORY_FUNCTIONS,
doc = "Regex patterns for enabling creation of functions by referencing packages in matching http/https "
+ "urls."
)
private List<String> additionalEnabledFunctionsUrlPatterns = new ArrayList<>();
@FieldContext(
category = CATEGORY_FUNC_METADATA_MNG,
doc = "The Pulsar topic used for storing function metadata"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,18 +83,21 @@ public class FunctionActioner {
private final ConnectorsManager connectorsManager;
private final FunctionsManager functionsManager;
private final PulsarAdmin pulsarAdmin;
private final PackageUrlValidator packageUrlValidator;

public FunctionActioner(WorkerConfig workerConfig,
RuntimeFactory runtimeFactory,
Namespace dlogNamespace,
ConnectorsManager connectorsManager,
FunctionsManager functionsManager, PulsarAdmin pulsarAdmin) {
FunctionsManager functionsManager, PulsarAdmin pulsarAdmin,
PackageUrlValidator packageUrlValidator) {
this.workerConfig = workerConfig;
this.runtimeFactory = runtimeFactory;
this.dlogNamespace = dlogNamespace;
this.connectorsManager = connectorsManager;
this.functionsManager = functionsManager;
this.pulsarAdmin = pulsarAdmin;
this.packageUrlValidator = packageUrlValidator;
}


Expand Down Expand Up @@ -152,6 +155,9 @@ private String getPackageFile(FunctionMetaData functionMetaData, FunctionDetails
boolean isPkgUrlProvided = isFunctionPackageUrlSupported(packagePath);
String packageFile;
if (isPkgUrlProvided && packagePath.startsWith(FILE)) {
if (!packageUrlValidator.isValidPackageUrl(componentType, packagePath)) {
throw new IllegalArgumentException("Package URL " + packagePath + " is not valid");
}
URL url = new URL(packagePath);
File pkgFile = new File(url.toURI());
packageFile = pkgFile.getAbsolutePath();
Expand All @@ -168,7 +174,7 @@ private String getPackageFile(FunctionMetaData functionMetaData, FunctionDetails
pkgDir,
new File(getDownloadFileName(functionMetaData.getFunctionDetails(),
pkgLocation)).getName());
downloadFile(pkgFile, isPkgUrlProvided, functionMetaData, instanceId, pkgLocation);
downloadFile(pkgFile, isPkgUrlProvided, functionMetaData, instanceId, pkgLocation, componentType);
packageFile = pkgFile.getAbsolutePath();
}
return packageFile;
Expand Down Expand Up @@ -227,7 +233,8 @@ InstanceConfig createInstanceConfig(FunctionDetails functionDetails, Function.Fu
}

private void downloadFile(File pkgFile, boolean isPkgUrlProvided, FunctionMetaData functionMetaData,
int instanceId, Function.PackageLocationMetaData pkgLocation)
int instanceId, Function.PackageLocationMetaData pkgLocation,
FunctionDetails.ComponentType componentType)
throws IOException, PulsarAdminException {

FunctionDetails details = functionMetaData.getFunctionDetails();
Expand All @@ -252,6 +259,9 @@ private void downloadFile(File pkgFile, boolean isPkgUrlProvided, FunctionMetaDa
downloadFromHttp ? pkgLocationPath : pkgLocation);

if (downloadFromHttp) {
if (!packageUrlValidator.isValidPackageUrl(componentType, pkgLocationPath)) {
throw new IllegalArgumentException("Package URL " + pkgLocationPath + " is not valid");
}
FunctionCommon.downloadFromHttpUrl(pkgLocationPath, tempPkgFile);
} else if (downloadFromPackageManagementService) {
getPulsarAdmin().packages().download(pkgLocationPath, tempPkgFile.getPath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ public FunctionRuntimeManager(WorkerConfig workerConfig, PulsarWorkerService wor
functionAuthProvider, runtimeCustomizer);

this.functionActioner = new FunctionActioner(this.workerConfig, runtimeFactory,
dlogNamespace, connectorsManager, functionsManager, workerService.getBrokerAdmin());
dlogNamespace, connectorsManager, functionsManager, workerService.getBrokerAdmin(),
workerService.getPackageUrlValidator());

this.membershipManager = membershipManager;
this.functionMetaDataManager = functionMetaDataManager;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.worker;

import java.net.URI;
import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.pulsar.functions.proto.Function;

/**
* Validates package URLs for functions and connectors.
* Validates that the package URL is either a file in the connectors or functions directory
* when referencing connector or function files is enabled, or matches one of the additional url patterns.
*/
public class PackageUrlValidator {
private final Path connectionsDirectory;
private final Path functionsDirectory;
private final List<Pattern> additionalConnectionsPatterns;
private final List<Pattern> additionalFunctionsPatterns;

public PackageUrlValidator(WorkerConfig workerConfig) {
this.connectionsDirectory = resolveDirectory(workerConfig.getEnableReferencingConnectorDirectoryFiles(),
workerConfig.getConnectorsDirectory());
this.functionsDirectory = resolveDirectory(workerConfig.getEnableReferencingFunctionsDirectoryFiles(),
workerConfig.getFunctionsDirectory());
this.additionalConnectionsPatterns =
compilePatterns(workerConfig.getAdditionalEnabledConnectorUrlPatterns());
this.additionalFunctionsPatterns =
compilePatterns(workerConfig.getAdditionalEnabledFunctionsUrlPatterns());
}

private static Path resolveDirectory(Boolean enabled, String directory) {
return enabled != null && enabled
? Path.of(directory).normalize().toAbsolutePath() : null;
}

private static List<Pattern> compilePatterns(List<String> additionalPatterns) {
return additionalPatterns != null ? additionalPatterns.stream().map(Pattern::compile).collect(
Collectors.toList()) : Collections.emptyList();
}

boolean isValidFunctionsPackageUrl(URI functionPkgUrl) {
return doesMatch(functionPkgUrl, functionsDirectory, additionalFunctionsPatterns);
}

boolean isValidConnectionsPackageUrl(URI functionPkgUrl) {
return doesMatch(functionPkgUrl, connectionsDirectory, additionalConnectionsPatterns);
}

private boolean doesMatch(URI functionPkgUrl, Path directory, List<Pattern> patterns) {
if (directory != null && "file".equals(functionPkgUrl.getScheme())) {
Path filePath = Path.of(functionPkgUrl.getPath()).normalize().toAbsolutePath();
if (filePath.startsWith(directory)) {
return true;
}
}
String functionPkgUrlString = functionPkgUrl.normalize().toString();
for (Pattern pattern : patterns) {
if (pattern.matcher(functionPkgUrlString).matches()) {
return true;
}
}
return false;
}

public boolean isValidPackageUrl(Function.FunctionDetails.ComponentType componentType, String functionPkgUrl) {
URI uri = URI.create(functionPkgUrl);
if (componentType == null) {
// if component type is not specified, we need to check both functions and connections
return isValidFunctionsPackageUrl(uri) || isValidConnectionsPackageUrl(uri);
}
switch (componentType) {
case FUNCTION:
return isValidFunctionsPackageUrl(uri);
case SINK:
case SOURCE:
return isValidConnectionsPackageUrl(uri);
default:
throw new IllegalArgumentException("Unknown component type: " + componentType);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ public interface PulsarClientCreator {
private Sinks<PulsarWorkerService> sinks;
private Sources<PulsarWorkerService> sources;
private Workers<PulsarWorkerService> workers;

@Getter
private PackageUrlValidator packageUrlValidator;
private final PulsarClientCreator clientCreator;
private StateStoreProvider stateStoreProvider;

Expand Down Expand Up @@ -198,6 +199,7 @@ public void init(WorkerConfig workerConfig,
this.sinks = new SinksImpl(() -> PulsarWorkerService.this);
this.sources = new SourcesImpl(() -> PulsarWorkerService.this);
this.workers = new WorkerImpl(() -> PulsarWorkerService.this);
this.packageUrlValidator = new PackageUrlValidator(workerConfig);
}

@Override
Expand Down
Loading

0 comments on commit 207335a

Please sign in to comment.