Skip to content

Commit

Permalink
[improve][fn] Add configuration for connector & functions package url…
Browse files Browse the repository at this point in the history
… sources (#22184)

(cherry picked from commit 207335a)
  • Loading branch information
lhotari committed Mar 4, 2024
1 parent fdf2be1 commit b107387
Show file tree
Hide file tree
Showing 20 changed files with 357 additions and 188 deletions.
8 changes: 8 additions & 0 deletions conf/functions_worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,15 @@ saslJaasServerRoleTokenSignerSecretPath:
########################

connectorsDirectory: ./connectors
# Whether to enable referencing connectors directory files by file url in connector (sink/source) creation
enableReferencingConnectorDirectoryFiles: true
# Regex patterns for enabling creation of connectors by referencing packages in matching http/https urls
additionalEnabledConnectorUrlPatterns: []
functionsDirectory: ./functions
# Whether to enable referencing functions directory files by file url in functions creation
enableReferencingFunctionsDirectoryFiles: true
# Regex patterns for enabling creation of functions by referencing packages in matching http/https urls
additionalEnabledFunctionsUrlPatterns: []

# Enables extended validation for connector config with fine-grain annotation based validation
# during submission. Classloading with either enableClassloadingOfExternalFiles or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.net.URL;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
Expand Down Expand Up @@ -266,6 +267,11 @@ private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration conf
workerConfig.setAuthorizationEnabled(config.isAuthorizationEnabled());
workerConfig.setAuthorizationProvider(config.getAuthorizationProvider());

List<String> urlPatterns =
List.of(getPulsarApiExamplesJar().getParentFile().toURI() + ".*", "http://127\\.0\\.0\\.1:.*");
workerConfig.setAdditionalEnabledConnectorUrlPatterns(urlPatterns);
workerConfig.setAdditionalEnabledFunctionsUrlPatterns(urlPatterns);

PulsarWorkerService workerService = new PulsarWorkerService();
workerService.init(workerConfig, null, false);
return workerService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,10 @@ private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration conf
workerConfig.setAuthenticationEnabled(true);
workerConfig.setAuthorizationEnabled(true);

List<String> urlPatterns = List.of(getPulsarApiExamplesJar().getParentFile().toURI() + ".*");
workerConfig.setAdditionalEnabledConnectorUrlPatterns(urlPatterns);
workerConfig.setAdditionalEnabledFunctionsUrlPatterns(urlPatterns);

PulsarWorkerService workerService = new PulsarWorkerService();
workerService.init(workerConfig, null, false);
return workerService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
Expand All @@ -32,6 +32,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -144,6 +145,12 @@ void setup() throws Exception {
workerConfig.setBrokerClientAuthenticationEnabled(true);
workerConfig.setTlsEnabled(true);
workerConfig.setUseTls(true);
File packagePath = new File(
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath()).getParentFile();
List<String> urlPatterns =
List.of(packagePath.toURI() + ".*");
workerConfig.setAdditionalEnabledConnectorUrlPatterns(urlPatterns);
workerConfig.setAdditionalEnabledFunctionsUrlPatterns(urlPatterns);
fnWorkerServices[i] = WorkerServiceLoader.load(workerConfig);

configurations[i] = config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -296,6 +297,11 @@ private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration conf
workerConfig.setAuthenticationEnabled(true);
workerConfig.setAuthorizationEnabled(true);

List<String> urlPatterns =
List.of(getPulsarApiExamplesJar().getParentFile().toURI() + ".*", "http://127\\.0\\.0\\.1:.*");
workerConfig.setAdditionalEnabledConnectorUrlPatterns(urlPatterns);
workerConfig.setAdditionalEnabledFunctionsUrlPatterns(urlPatterns);

PulsarWorkerService workerService = new PulsarWorkerService();
workerService.init(workerConfig, null, false);
return workerService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,18 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
doc = "The directory where nar packages are extractors"
)
private String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR;
@FieldContext(
category = CATEGORY_CONNECTORS,
doc = "Whether to enable referencing connectors directory files by file url in connector (sink/source) "
+ "creation. Default is true."
)
private Boolean enableReferencingConnectorDirectoryFiles = true;
@FieldContext(
category = CATEGORY_FUNCTIONS,
doc = "Regex patterns for enabling creation of connectors by referencing packages in matching http/https "
+ "urls."
)
private List<String> additionalEnabledConnectorUrlPatterns = new ArrayList<>();
@FieldContext(
category = CATEGORY_CONNECTORS,
doc = "Enables extended validation for connector config with fine-grain annotation based validation "
Expand All @@ -269,6 +281,18 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
doc = "The path to the location to locate builtin functions"
)
private String functionsDirectory = "./functions";
@FieldContext(
category = CATEGORY_FUNCTIONS,
doc = "Whether to enable referencing functions directory files by file url in functions creation. "
+ "Default is true."
)
private Boolean enableReferencingFunctionsDirectoryFiles = true;
@FieldContext(
category = CATEGORY_FUNCTIONS,
doc = "Regex patterns for enabling creation of functions by referencing packages in matching http/https "
+ "urls."
)
private List<String> additionalEnabledFunctionsUrlPatterns = new ArrayList<>();
@FieldContext(
category = CATEGORY_FUNC_METADATA_MNG,
doc = "The Pulsar topic used for storing function metadata"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
import com.google.common.io.MoreFiles;
import com.google.common.io.RecursiveDeleteOption;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
Expand Down Expand Up @@ -82,18 +82,21 @@ public class FunctionActioner {
private final ConnectorsManager connectorsManager;
private final FunctionsManager functionsManager;
private final PulsarAdmin pulsarAdmin;
private final PackageUrlValidator packageUrlValidator;

public FunctionActioner(WorkerConfig workerConfig,
RuntimeFactory runtimeFactory,
Namespace dlogNamespace,
ConnectorsManager connectorsManager,
FunctionsManager functionsManager, PulsarAdmin pulsarAdmin) {
FunctionsManager functionsManager, PulsarAdmin pulsarAdmin,
PackageUrlValidator packageUrlValidator) {
this.workerConfig = workerConfig;
this.runtimeFactory = runtimeFactory;
this.dlogNamespace = dlogNamespace;
this.connectorsManager = connectorsManager;
this.functionsManager = functionsManager;
this.pulsarAdmin = pulsarAdmin;
this.packageUrlValidator = packageUrlValidator;
}


Expand All @@ -108,30 +111,13 @@ public void startFunction(FunctionRuntimeInfo functionRuntimeInfo) {

String packageFile;

String pkgLocation = functionMetaData.getPackageLocation().getPackagePath();
boolean isPkgUrlProvided = isFunctionPackageUrlSupported(pkgLocation);
Function.PackageLocationMetaData pkgLocation = functionMetaData.getPackageLocation();

if (runtimeFactory.externallyManaged()) {
packageFile = pkgLocation;
packageFile = pkgLocation.getPackagePath();
} else {
if (isPkgUrlProvided && pkgLocation.startsWith(FILE)) {
URL url = new URL(pkgLocation);
File pkgFile = new File(url.toURI());
packageFile = pkgFile.getAbsolutePath();
} else if (FunctionCommon.isFunctionCodeBuiltin(functionDetails)) {
File pkgFile = getBuiltinArchive(FunctionDetails.newBuilder(functionMetaData.getFunctionDetails()));
packageFile = pkgFile.getAbsolutePath();
} else {
File pkgDir = new File(workerConfig.getDownloadDirectory(),
getDownloadPackagePath(functionMetaData, instanceId));
pkgDir.mkdirs();
File pkgFile = new File(
pkgDir,
new File(getDownloadFileName(functionMetaData.getFunctionDetails(),
functionMetaData.getPackageLocation())).getName());
downloadFile(pkgFile, isPkgUrlProvided, functionMetaData, instanceId);
packageFile = pkgFile.getAbsolutePath();
}
packageFile = getPackageFile(functionMetaData, functionDetails, instanceId, pkgLocation,
InstanceUtils.calculateSubjectType(functionDetails));
}

// Setup for batch sources if necessary
Expand All @@ -150,6 +136,39 @@ public void startFunction(FunctionRuntimeInfo functionRuntimeInfo) {
}
}

private String getPackageFile(FunctionMetaData functionMetaData, FunctionDetails functionDetails, int instanceId,
Function.PackageLocationMetaData pkgLocation,
FunctionDetails.ComponentType componentType)
throws URISyntaxException, IOException, ClassNotFoundException, PulsarAdminException {
String packagePath = pkgLocation.getPackagePath();
boolean isPkgUrlProvided = isFunctionPackageUrlSupported(packagePath);
String packageFile;
if (isPkgUrlProvided && packagePath.startsWith(FILE)) {
if (!packageUrlValidator.isValidPackageUrl(componentType, packagePath)) {
throw new IllegalArgumentException("Package URL " + packagePath + " is not valid");
}
URL url = new URL(packagePath);
File pkgFile = new File(url.toURI());
packageFile = pkgFile.getAbsolutePath();
} else if (FunctionCommon.isFunctionCodeBuiltin(functionDetails, componentType)) {
File pkgFile = getBuiltinArchive(
componentType,
FunctionDetails.newBuilder(functionMetaData.getFunctionDetails()));
packageFile = pkgFile.getAbsolutePath();
} else {
File pkgDir = new File(workerConfig.getDownloadDirectory(),
getDownloadPackagePath(functionMetaData, instanceId));
pkgDir.mkdirs();
File pkgFile = new File(
pkgDir,
new File(getDownloadFileName(functionMetaData.getFunctionDetails(),
pkgLocation)).getName());
downloadFile(pkgFile, isPkgUrlProvided, functionMetaData, instanceId, pkgLocation, componentType);
packageFile = pkgFile.getAbsolutePath();
}
return packageFile;
}

RuntimeSpawner getRuntimeSpawner(Function.Instance instance, String packageFile) {
FunctionMetaData functionMetaData = instance.getFunctionMetaData();
int instanceId = instance.getInstanceId();
Expand Down Expand Up @@ -198,7 +217,9 @@ InstanceConfig createInstanceConfig(FunctionDetails functionDetails, Function.Fu
}

private void downloadFile(File pkgFile, boolean isPkgUrlProvided, FunctionMetaData functionMetaData,
int instanceId) throws FileNotFoundException, IOException, PulsarAdminException {
int instanceId, Function.PackageLocationMetaData pkgLocation,
FunctionDetails.ComponentType componentType)
throws IOException, PulsarAdminException {

FunctionDetails details = functionMetaData.getFunctionDetails();
File pkgDir = pkgFile.getParentFile();
Expand All @@ -214,14 +235,17 @@ private void downloadFile(File pkgFile, boolean isPkgUrlProvided, FunctionMetaDa
pkgDir,
pkgFile.getName() + "." + instanceId + "." + UUID.randomUUID());
} while (tempPkgFile.exists() || !tempPkgFile.createNewFile());
String pkgLocationPath = functionMetaData.getPackageLocation().getPackagePath();
String pkgLocationPath = pkgLocation.getPackagePath();
boolean downloadFromHttp = isPkgUrlProvided && pkgLocationPath.startsWith(HTTP);
boolean downloadFromPackageManagementService = isPkgUrlProvided && hasPackageTypePrefix(pkgLocationPath);
log.info("{}/{}/{} Function package file {} will be downloaded from {}", tempPkgFile, details.getTenant(),
details.getNamespace(), details.getName(),
downloadFromHttp ? pkgLocationPath : functionMetaData.getPackageLocation());
downloadFromHttp ? pkgLocationPath : pkgLocation);

if (downloadFromHttp) {
if (!packageUrlValidator.isValidPackageUrl(componentType, pkgLocationPath)) {
throw new IllegalArgumentException("Package URL " + pkgLocationPath + " is not valid");
}
FunctionCommon.downloadFromHttpUrl(pkgLocationPath, tempPkgFile);
} else if (downloadFromPackageManagementService) {
getPulsarAdmin().packages().download(pkgLocationPath, tempPkgFile.getPath());
Expand All @@ -248,7 +272,7 @@ private void downloadFile(File pkgFile, boolean isPkgUrlProvided, FunctionMetaDa
} catch (FileAlreadyExistsException faee) {
// file already exists
log.warn("Function package has been downloaded from {} and saved at {}",
functionMetaData.getPackageLocation(), pkgFile);
pkgLocation, pkgFile);
}
} finally {
tempPkgFile.delete();
Expand Down Expand Up @@ -486,8 +510,9 @@ private String getDownloadPackagePath(FunctionMetaData functionMetaData, int ins
File.separatorChar);
}

private File getBuiltinArchive(FunctionDetails.Builder functionDetails) throws IOException, ClassNotFoundException {
if (functionDetails.hasSource()) {
private File getBuiltinArchive(FunctionDetails.ComponentType componentType, FunctionDetails.Builder functionDetails)
throws IOException, ClassNotFoundException {
if (componentType == FunctionDetails.ComponentType.SOURCE && functionDetails.hasSource()) {
SourceSpec sourceSpec = functionDetails.getSource();
if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) {
Connector connector = connectorsManager.getConnector(sourceSpec.getBuiltin());
Expand All @@ -502,7 +527,7 @@ private File getBuiltinArchive(FunctionDetails.Builder functionDetails) throws I
}
}

if (functionDetails.hasSink()) {
if (componentType == FunctionDetails.ComponentType.SINK && functionDetails.hasSink()) {
SinkSpec sinkSpec = functionDetails.getSink();
if (!StringUtils.isEmpty(sinkSpec.getBuiltin())) {
Connector connector = connectorsManager.getConnector(sinkSpec.getBuiltin());
Expand All @@ -518,7 +543,8 @@ private File getBuiltinArchive(FunctionDetails.Builder functionDetails) throws I
}
}

if (!StringUtils.isEmpty(functionDetails.getBuiltin())) {
if (componentType == FunctionDetails.ComponentType.FUNCTION
&& !StringUtils.isEmpty(functionDetails.getBuiltin())) {
return functionsManager.getFunctionArchive(functionDetails.getBuiltin()).toFile();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ public FunctionRuntimeManager(WorkerConfig workerConfig, PulsarWorkerService wor
functionAuthProvider, runtimeCustomizer);

this.functionActioner = new FunctionActioner(this.workerConfig, runtimeFactory,
dlogNamespace, connectorsManager, functionsManager, workerService.getBrokerAdmin());
dlogNamespace, connectorsManager, functionsManager, workerService.getBrokerAdmin(),
workerService.getPackageUrlValidator());

this.membershipManager = membershipManager;
this.functionMetaDataManager = functionMetaDataManager;
Expand Down
Loading

0 comments on commit b107387

Please sign in to comment.