From 570d59cc5c6faad193f20d76aa07417f7d645b0b Mon Sep 17 00:00:00 2001 From: Boyang Jerry Peng Date: Tue, 26 Dec 2017 17:32:52 -0800 Subject: [PATCH] Initial implementation of Pulsar Function Worker (#47) --- pulsar-functions/bin/pulsar-functions | 2 +- pulsar-functions/conf/client.conf | 2 +- pulsar-functions/conf/example.yml | 2 +- pulsar-functions/conf/log4j.shell.properties | 2 +- pulsar-functions/pom.xml | 2 +- .../ThreadFunctionContainerBenchmark.java | 2 +- pulsar-functions/runtime/pom.xml | 75 +++- .../apache/pulsar/admin/cli/CmdFunctions.java | 6 +- .../client/admin/internal/FunctionsImpl.java | 13 +- .../pulsar/functions/fs/FunctionConfig.java | 2 +- .../runtime/spawner/LimitsConfig.java | 4 +- .../functions/runtime/spawner/Spawner.java | 4 +- .../runtime/worker/FunctionState.java | 210 +++++++++ .../runtime/worker/FunctionStateManager.java | 278 ++++++++++++ .../functions/runtime/worker/Utils.java | 70 +++ .../functions/runtime/worker/Worker.java | 60 +++ .../runtime/worker/WorkerConfig.java | 86 ++++ .../worker/request/DeregisterRequest.java | 42 ++ .../runtime/worker/request/RequestResult.java | 56 +++ .../worker/request/ServiceRequest.java | 104 +++++ .../worker/request/ServiceRequestManager.java | 68 +++ .../runtime/worker/request/UpdateRequest.java | 39 ++ .../runtime/worker/rest/BaseApiResource.java | 61 +++ .../worker/rest/ConfigurationResource.java | 48 ++ .../worker/rest/FunctionStateListener.java | 92 ++++ .../runtime/worker/rest/Resources.java | 45 ++ .../runtime/worker/rest/RestUtils.java | 70 +++ .../runtime/worker/rest/WorkerServer.java | 94 ++++ .../worker/rest/api/v1/ApiV1Resource.java | 415 ++++++++++++++++++ .../runtime/worker/rest/api/v1/Utils.java | 134 ++++++ .../rest/api/v1/dlog/DLInputStream.java | 145 ++++++ .../rest/api/v1/dlog/DLOutputStream.java | 73 +++ .../worker/FunctionStateManagerTest.java | 4 + .../functions/runtime/worker/WorkerTest.java | 61 +++ 34 files changed, 2345 insertions(+), 26 deletions(-) create mode 100644 pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/FunctionState.java create mode 100644 pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/FunctionStateManager.java create mode 100644 pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/Utils.java create mode 100644 pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/Worker.java create mode 100644 pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/WorkerConfig.java create mode 100644 pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/request/DeregisterRequest.java create mode 100644 pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/request/RequestResult.java create mode 100644 pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/request/ServiceRequest.java create mode 100644 pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/request/ServiceRequestManager.java create mode 100644 pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/request/UpdateRequest.java create mode 100644 pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/rest/BaseApiResource.java create mode 100644 pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/rest/ConfigurationResource.java create mode 100644 pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/rest/FunctionStateListener.java create mode 100644 pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/rest/Resources.java create mode 100644 pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/rest/RestUtils.java create mode 100644 pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/rest/WorkerServer.java create mode 100644 pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/rest/api/v1/ApiV1Resource.java create mode 100644 pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/rest/api/v1/Utils.java create mode 100644 pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/rest/api/v1/dlog/DLInputStream.java create mode 100644 pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/rest/api/v1/dlog/DLOutputStream.java create mode 100644 pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/worker/FunctionStateManagerTest.java create mode 100644 pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/worker/WorkerTest.java diff --git a/pulsar-functions/bin/pulsar-functions b/pulsar-functions/bin/pulsar-functions index 8898f9c456baa..a0b770dd04ae8 100755 --- a/pulsar-functions/bin/pulsar-functions +++ b/pulsar-functions/bin/pulsar-functions @@ -22,7 +22,7 @@ BINDIR=$(dirname "$0") PULSAR_HOME=`cd $BINDIR/..;pwd` DEFAULT_CLIENT_CONF=$PULSAR_HOME/conf/client.conf -DEFAULT_LOG_CONF=$PULSAR_HOME/conf/log4j.properties +DEFAULT_LOG_CONF=$PULSAR_HOME/conf/log4j.shell.properties if [ -f "$PULSAR_HOME/conf/pulsar_tools_env.sh" ] then diff --git a/pulsar-functions/conf/client.conf b/pulsar-functions/conf/client.conf index 5afa987e16b0b..21e6e31cbfb85 100644 --- a/pulsar-functions/conf/client.conf +++ b/pulsar-functions/conf/client.conf @@ -18,7 +18,7 @@ # # Pulsar Client configuration -webServiceUrl=http://localhost:8080/ +webServiceUrl=http://localhost:8001/ brokerServiceUrl=pulsar://localhost:6650/ #authPlugin= #authParams= diff --git a/pulsar-functions/conf/example.yml b/pulsar-functions/conf/example.yml index 8726dfde4c843..90f076402f796 100644 --- a/pulsar-functions/conf/example.yml +++ b/pulsar-functions/conf/example.yml @@ -18,6 +18,6 @@ # tenant: "test" -nameSpace: "test" +namespace: "test-namespace" name: "example" className: "org.apache.pulsar.functions.api.examples.ExclamationFunction" diff --git a/pulsar-functions/conf/log4j.shell.properties b/pulsar-functions/conf/log4j.shell.properties index 146d153fcac0c..02e034d9ea81c 100644 --- a/pulsar-functions/conf/log4j.shell.properties +++ b/pulsar-functions/conf/log4j.shell.properties @@ -22,7 +22,7 @@ # DEFAULT: console appender only # Define some default values that can be overridden by system properties -bookkeeper.root.logger=ERROR,CONSOLE +bookkeeper.root.logger=INFO,CONSOLE log4j.rootLogger=${bookkeeper.root.logger} diff --git a/pulsar-functions/pom.xml b/pulsar-functions/pom.xml index 00aef567adac0..943bed8f1b5c1 100644 --- a/pulsar-functions/pom.xml +++ b/pulsar-functions/pom.xml @@ -19,7 +19,7 @@ --> + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 pom diff --git a/pulsar-functions/runtime-benchmark/src/main/java/org/apache/pulsar/functions/runtime/benchmark/ThreadFunctionContainerBenchmark.java b/pulsar-functions/runtime-benchmark/src/main/java/org/apache/pulsar/functions/runtime/benchmark/ThreadFunctionContainerBenchmark.java index b96a12c268c9d..2096757772084 100644 --- a/pulsar-functions/runtime-benchmark/src/main/java/org/apache/pulsar/functions/runtime/benchmark/ThreadFunctionContainerBenchmark.java +++ b/pulsar-functions/runtime-benchmark/src/main/java/org/apache/pulsar/functions/runtime/benchmark/ThreadFunctionContainerBenchmark.java @@ -70,7 +70,7 @@ public void prepare() { fnConfig.setSinkTopic("test-sink"); fnConfig.setSourceTopic("test-source"); fnConfig.setTenant("test-tenant"); - fnConfig.setNameSpace("test-namespace"); + fnConfig.setNamespace("test-namespace"); fnConfig.setVersion(UUID.randomUUID().toString()); JavaInstanceConfig config = diff --git a/pulsar-functions/runtime/pom.xml b/pulsar-functions/runtime/pom.xml index d273113b6ff7f..7b76f4aef2aa4 100644 --- a/pulsar-functions/runtime/pom.xml +++ b/pulsar-functions/runtime/pom.xml @@ -21,6 +21,15 @@ 4.0.0 + + + 9.4.8.v20171121 + 2.26 + 0.5.0 + 2.8.2 + 2.9.2 + + org.apache.pulsar pulsar-functions @@ -58,19 +67,19 @@ com.fasterxml.jackson.dataformat jackson-dataformat-yaml - 2.9.2 + ${jackson.version} com.fasterxml.jackson.core jackson-annotations - 2.9.2 + ${jackson.version} com.fasterxml.jackson.core jackson-databind - 2.9.2 + ${jackson.version} @@ -86,20 +95,72 @@ provided + + org.glassfish.jersey.media + jersey-media-json-jackson + ${jersery.version} + + org.glassfish.jersey.core - jersey-client + jersey-server + ${jersery.version} - org.glassfish.jersey.media - jersey-media-json-jackson + org.glassfish.jersey.containers + jersey-container-servlet + ${jersery.version} + + + + org.glassfish.jersey.containers + jersey-container-servlet-core + ${jersery.version} + + + + org.glassfish.jersey.inject + jersey-hk2 + ${jersery.version} + + + + org.glassfish.jersey.core + jersey-client + ${jersery.version} org.glassfish.jersey.media jersey-media-multipart - 2.23 + ${jersery.version} + + + + org.eclipse.jetty + jetty-server + ${jetty.version} + + + + org.eclipse.jetty + jetty-servlet + ${jetty.version} + + + + org.apache.distributedlog + distributedlog-core + ${dlog.version} + jar + shaded + + + + com.google.code.gson + gson + ${gson.version} diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index dc18ffd446845..4b4714dc8704c 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -133,7 +133,7 @@ class GetFunction extends FunctionsCommand { @Override void run_functions_cmd() throws Exception { PulsarFunctionsAdmin a = (PulsarFunctionsAdmin)admin; - print(a.functions().getFunction(functionConfig.getTenant(), functionConfig.getNameSpace(), functionConfig.getName())); + print(a.functions().getFunction(functionConfig.getTenant(), functionConfig.getNamespace(), functionConfig.getName())); } } @@ -142,7 +142,7 @@ class DeleteFunction extends FunctionsCommand { @Override void run_functions_cmd() throws Exception { PulsarFunctionsAdmin a = (PulsarFunctionsAdmin)admin; - a.functions().deleteFunction(functionConfig.getTenant(), functionConfig.getNameSpace(), functionConfig.getName()); + a.functions().deleteFunction(functionConfig.getTenant(), functionConfig.getNamespace(), functionConfig.getName()); print("Deleted successfully"); } } @@ -162,7 +162,7 @@ class ListFunctions extends FunctionsCommand { @Override void run_functions_cmd() throws Exception { PulsarFunctionsAdmin a = (PulsarFunctionsAdmin)admin; - print(a.functions().getFunctions(functionConfig.getTenant(), functionConfig.getNameSpace())); + print(a.functions().getFunctions(functionConfig.getTenant(), functionConfig.getNamespace())); } } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java index 1c1759e090087..381b9bfc26209 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java @@ -25,12 +25,13 @@ import org.apache.pulsar.functions.fs.FunctionConfig; import org.glassfish.jersey.media.multipart.FormDataBodyPart; import org.glassfish.jersey.media.multipart.FormDataMultiPart; +import org.glassfish.jersey.media.multipart.file.FileDataBodyPart; import javax.ws.rs.client.Entity; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.GenericType; import javax.ws.rs.core.MediaType; -import java.io.FileInputStream; +import java.io.File; import java.util.List; public class FunctionsImpl extends BaseResource implements Functions { @@ -65,7 +66,9 @@ public FunctionConfig getFunction(String tenant, String namespace, String functi public void createFunction(FunctionConfig functionConfig, String fileName) throws PulsarAdminException { try { final FormDataMultiPart mp = new FormDataMultiPart(); - mp.bodyPart(new FormDataBodyPart("data", new FileInputStream(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE)); + + mp.bodyPart(new FileDataBodyPart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE)); + mp.bodyPart(new FormDataBodyPart("sourceTopic", functionConfig.getSourceTopic(), MediaType.APPLICATION_JSON_TYPE)); mp.bodyPart(new FormDataBodyPart("sinkTopic", functionConfig.getSinkTopic(), @@ -76,7 +79,7 @@ public void createFunction(FunctionConfig functionConfig, String fileName) throw MediaType.APPLICATION_JSON_TYPE)); mp.bodyPart(new FormDataBodyPart("className", functionConfig.getClassName(), MediaType.APPLICATION_JSON_TYPE)); - request(functions.path(functionConfig.getTenant()).path(functionConfig.getNameSpace()).path(functionConfig.getName())) + request(functions.path(functionConfig.getTenant()).path(functionConfig.getNamespace()).path(functionConfig.getName())) .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class); } catch (Exception e) { throw getApiException(e); @@ -98,7 +101,7 @@ public void updateFunction(FunctionConfig functionConfig, String fileName) throw try { final FormDataMultiPart mp = new FormDataMultiPart(); if (fileName != null) { - mp.bodyPart(new FormDataBodyPart("data", new FileInputStream(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE)); + mp.bodyPart(new FileDataBodyPart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE)); } if (functionConfig.getSourceTopic() != null) { mp.bodyPart(new FormDataBodyPart("sourceTopic", functionConfig.getSourceTopic(), @@ -120,7 +123,7 @@ public void updateFunction(FunctionConfig functionConfig, String fileName) throw mp.bodyPart(new FormDataBodyPart("className", functionConfig.getClassName(), MediaType.APPLICATION_JSON_TYPE)); } - request(functions.path(functionConfig.getTenant()).path(functionConfig.getNameSpace()).path(functionConfig.getName())) + request(functions.path(functionConfig.getTenant()).path(functionConfig.getNamespace()).path(functionConfig.getName())) .put(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class); } catch (Exception e) { throw getApiException(e); diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/fs/FunctionConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/fs/FunctionConfig.java index 3c1c7ff2e5d66..645a611bd5f3f 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/fs/FunctionConfig.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/fs/FunctionConfig.java @@ -40,7 +40,7 @@ public class FunctionConfig { // tenant that the function resides in private String tenant; // namespace that the function belongs to - private String nameSpace; + private String namespace; // function name private String name; // Function version diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/spawner/LimitsConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/spawner/LimitsConfig.java index efa7c74799677..53a36246ef465 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/spawner/LimitsConfig.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/spawner/LimitsConfig.java @@ -30,7 +30,7 @@ * This represents the config related to the resource limits of function calls */ public class LimitsConfig { - private int timeBudgetInMs; - private int maxMemory; + private int maxTimeMs; + private int maxMemoryMb; private int maxBufferedTuples; } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/spawner/Spawner.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/spawner/Spawner.java index 57ea1cf58ab01..b1278eba43d7c 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/spawner/Spawner.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/spawner/Spawner.java @@ -87,8 +87,8 @@ private JavaInstanceConfig createJavaInstanceConfig() { javaInstanceConfig.setFunctionConfig(assignmentInfo.getFunctionConfig()); javaInstanceConfig.setFunctionId(assignmentInfo.getFunctionId()); javaInstanceConfig.setFunctionVersion(assignmentInfo.getFunctionVersion()); - javaInstanceConfig.setTimeBudgetInMs(limitsConfig.getTimeBudgetInMs()); - javaInstanceConfig.setMaxMemory(limitsConfig.getMaxMemory()); + javaInstanceConfig.setTimeBudgetInMs(limitsConfig.getMaxTimeMs()); + javaInstanceConfig.setMaxMemory(limitsConfig.getMaxMemoryMb()); return javaInstanceConfig; } } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/FunctionState.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/FunctionState.java new file mode 100644 index 0000000000000..ccc5f709afa83 --- /dev/null +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/FunctionState.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.runtime.worker; + +import com.google.gson.Gson; + +import java.io.Serializable; + +public class FunctionState implements Serializable, Cloneable{ + + private String namespace; + private String tenant; + private String name; + private String packageLocation; + private long maxMemoryMb; + private long maxCpuCores; + private long maxTimeMs; + private String runtime; + private long version; + private long createTime; + private String workerId; + private String sourceTopic; + private String sinkTopic; + private String inputSerdeClassName; + private String outputSerdeClassName; + private String className; + + public String getInputSerdeClassName() { + return inputSerdeClassName; + } + + public void setInputSerdeClassName(String inputSerdeClassName) { + this.inputSerdeClassName = inputSerdeClassName; + } + + public String getOutputSerdeClassName() { + return outputSerdeClassName; + } + + public void setOutputSerdeClassName(String outputSerdeClassName) { + this.outputSerdeClassName = outputSerdeClassName; + } + + public String getClassName() { + return className; + } + + public void setClassName(String className) { + this.className = className; + } + + public String getSourceTopic() { + return sourceTopic; + } + + public void setSourceTopic(String sourceTopic) { + this.sourceTopic = sourceTopic; + } + + public String getSinkTopic() { + return sinkTopic; + } + + public void setSinkTopic(String sinkTopic) { + this.sinkTopic = sinkTopic; + } + + public String getNamespace() { + return namespace; + } + + public void setNamespace(String namespace) { + this.namespace = namespace; + } + + public String getTenant() { + return tenant; + } + + public void setTenant(String tenant) { + this.tenant = tenant; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getPackageLocation() { + return packageLocation; + } + + public void setPackageLocation(String packageLocation) { + this.packageLocation = packageLocation; + } + + public long getMaxMemoryMb() { + return maxMemoryMb; + } + + public void setMaxMemoryMb(long maxMemoryMb) { + this.maxMemoryMb = maxMemoryMb; + } + + public long getMaxCpuCores() { + return maxCpuCores; + } + + public void setMaxCpuCores(long maxCpuCores) { + this.maxCpuCores = maxCpuCores; + } + + public long getMaxTimeMs() { + return maxTimeMs; + } + + public void setMaxTimeMs(long maxTimeMs) { + this.maxTimeMs = maxTimeMs; + } + + public String getRuntime() { + return runtime; + } + + public void setRuntime(String runtime) { + this.runtime = runtime; + } + + public long getVersion() { + return version; + } + + public void setVersion(long version) { + this.version = version; + } + + public void incrementVersion() { + this.version = this.version + 1; + } + + public long getCreateTime() { + return createTime; + } + + public void setCreateTime(long createTime) { + this.createTime = createTime; + } + + public String getWorkerId() { + return workerId; + } + + public void setWorkerId(String workerId) { + this.workerId = workerId; + } + + public String toJson() { + return new Gson().toJson(this); + } + + public FunctionState fromJson(String json) { + return new Gson().fromJson(json, this.getClass()); + } + + @Override + public Object clone() { + return this.fromJson(this.toJson()); + } + + @Override + public String toString() { + return "FunctionState{" + + "namespace='" + namespace + '\'' + + ", tenant='" + tenant + '\'' + + ", name='" + name + '\'' + + ", packageLocation='" + packageLocation + '\'' + + ", maxMemoryMb=" + maxMemoryMb + + ", maxCpuCores=" + maxCpuCores + + ", maxTimeMs=" + maxTimeMs + + ", runtime='" + runtime + '\'' + + ", version=" + version + + ", createTime=" + createTime + + ", workerId='" + workerId + '\'' + + ", sourceTopic='" + sourceTopic + '\'' + + ", sinkTopic='" + sinkTopic + '\'' + + ", inputSerdeClassName='" + inputSerdeClassName + '\'' + + ", outputSerdeClassName='" + outputSerdeClassName + '\'' + + ", className='" + className + '\'' + + '}'; + } +} diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/FunctionStateManager.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/FunctionStateManager.java new file mode 100644 index 0000000000000..0157fd8ce4a66 --- /dev/null +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/FunctionStateManager.java @@ -0,0 +1,278 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.runtime.worker; + +import org.apache.pulsar.client.api.*; +import org.apache.pulsar.functions.runtime.worker.request.DeregisterRequest; +import org.apache.pulsar.functions.runtime.worker.request.RequestResult; +import org.apache.pulsar.functions.runtime.worker.request.ServiceRequest; +import org.apache.pulsar.functions.runtime.worker.request.ServiceRequestManager; +import org.apache.pulsar.functions.runtime.worker.request.UpdateRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +public class FunctionStateManager { + + private static final Logger LOG = LoggerFactory.getLogger(FunctionStateManager.class); + + // tenant -> namespace -> (function name, FunctionState) + private final Map>> functionStateMap = new ConcurrentHashMap<>(); + + // A map in which the key is the service request id and value is the service request + private final Map pendingServiceRequests = new ConcurrentHashMap<>(); + + private final ServiceRequestManager serviceRequestManager; + + private final WorkerConfig workerConfig; + + public FunctionStateManager(WorkerConfig workerConfig) throws PulsarClientException { + this(workerConfig, new ServiceRequestManager(workerConfig)); + } + + public FunctionStateManager(WorkerConfig workerConfig, ServiceRequestManager serviceRequestManager) { + this.workerConfig = workerConfig; + this.serviceRequestManager = serviceRequestManager; + } + + public FunctionState getFunction(String tenant, String namespace, String functionName) { + return this.functionStateMap.get(tenant).get(namespace).get(functionName); + } + + public Collection listFunction(String tenant, String namespace) { + List ret = new LinkedList<>(); + + if (!this.functionStateMap.containsKey(tenant)) { + return ret; + } + + if (!this.functionStateMap.get(tenant).containsKey(namespace)) { + return ret; + } + for (FunctionState entry : this.functionStateMap.get(tenant).get(namespace).values()) { + ret.add(entry.getName()); + } + return ret; + } + + public CompletableFuture updateFunction(FunctionState functionState) { + + long version = 0; + + if (!this.functionStateMap.containsKey(functionState.getTenant())) { + this.functionStateMap.put(functionState.getTenant(), new ConcurrentHashMap<>()); + } + + if (!this.functionStateMap.get(functionState.getTenant()).containsKey(functionState.getNamespace())) { + this.functionStateMap.get(functionState.getTenant()) + .put(functionState.getNamespace(), new ConcurrentHashMap<>()); + } + + if (this.functionStateMap.get(functionState.getTenant()).get(functionState.getNamespace()) + .containsKey(functionState.getName())) { + version = this.functionStateMap.get(functionState.getTenant()) + .get(functionState.getNamespace()).get(functionState.getName()).getVersion() + 1; + } + + functionState.setVersion(version); + + UpdateRequest updateRequest = UpdateRequest.of(this.workerConfig.getWorkerId(), functionState); + + return submit(updateRequest); + } + + public CompletableFuture deregisterFunction(String tenant, String namespace, String functionName) { + FunctionState functionState + = (FunctionState) this.functionStateMap.get(tenant).get(namespace).get(functionName).clone(); + + functionState.incrementVersion(); + + DeregisterRequest deregisterRequest = DeregisterRequest.of(this.workerConfig.getWorkerId(), functionState); + + return submit(deregisterRequest); + } + + public boolean containsFunction(FunctionState functionState) { + return containsFunction(functionState.getTenant(), functionState.getNamespace(), functionState.getName()); + } + + public boolean containsFunction(String tenant, String namespace, String functionName) { + if (this.functionStateMap.containsKey(tenant)) { + if (this.functionStateMap.get(tenant).containsKey(namespace)) { + if (this.functionStateMap.get(tenant).get(namespace).containsKey(functionName)) { + return true; + } + } + } + return false; + } + + private CompletableFuture submit(ServiceRequest serviceRequest) { + CompletableFuture messageIdCompletableFuture = this.serviceRequestManager.submitRequest(serviceRequest); + + serviceRequest.setCompletableFutureRequestMessageId(messageIdCompletableFuture); + CompletableFuture requestResultCompletableFuture = new CompletableFuture<>(); + + serviceRequest.setRequestResultCompletableFuture(requestResultCompletableFuture); + + this.pendingServiceRequests.put(serviceRequest.getRequestId(), serviceRequest); + + return requestResultCompletableFuture; + } + + /** + * Complete requests that this worker has pending + * @param serviceRequest + * @param isSuccess + * @param message + */ + private void completeRequest(ServiceRequest serviceRequest, boolean isSuccess, String message) { + ServiceRequest pendingServiceRequest + = this.pendingServiceRequests.getOrDefault(serviceRequest.getRequestId(), null); + if (pendingServiceRequest != null) { + RequestResult requestResult = new RequestResult(); + requestResult.setSuccess(isSuccess); + requestResult.setMessage(message); + requestResult.setRequestDetails(serviceRequest); + pendingServiceRequest.getRequestResultCompletableFuture().complete(requestResult); + } + } + + private void completeRequest(ServiceRequest serviceRequest, boolean isSuccess) { + completeRequest(serviceRequest, isSuccess, null); + } + + public void proccessDeregister(DeregisterRequest deregisterRequest) { + + // check if the request is valid + if (!deregisterRequest.isValidRequest()) { + completeRequest(deregisterRequest, false, "Received invalid request"); + return; + } + + FunctionState deregisterRequestFs = deregisterRequest.getFunctionState(); + + LOG.debug("Process deregister request: {}", deregisterRequest); + + // Check if we still have this function. Maybe already deleted by someone else + if(this.containsFunction(deregisterRequestFs)) { + // check if request is outdated + if (!isRequestOutdated(deregisterRequest)) { + // Check if this worker is suppose to run the function + if (isMyRequest(deregisterRequest)) { + // stop running the function + stopFunction(deregisterRequestFs.getName()); + } + // remove function from in memory function state store + this.functionStateMap.remove(deregisterRequestFs.getName()); + completeRequest(deregisterRequest, true); + } else { + completeRequest(deregisterRequest, false, + "Request ignored because it is out of date. Please try again."); + } + } else { + // already deleted so just complete request + completeRequest(deregisterRequest, true); + } + } + + public void processUpdate(UpdateRequest updateRequest) { + + // check if the request is valid + if (!updateRequest.isValidRequest()) { + completeRequest(updateRequest, false, "Received invalid request"); + return; + } + + LOG.debug("Process update request: {}", updateRequest); + + FunctionState updateRequestFs = updateRequest.getFunctionState(); + + // Worker doesn't know about the function so far + if(!this.containsFunction(updateRequestFs)) { + // Since this is the first time worker has seen function, just put it into internal function state store + addFunctionToFunctionStateMap(updateRequestFs); + // Check if this worker is suppose to run the function + if (this.workerConfig.getWorkerId().equals(updateRequestFs.getWorkerId())) { + // start the function + startFunction(updateRequestFs.getName()); + } + completeRequest(updateRequest, true); + } else { + // The request is an update to an existing function since this worker already has a record of this function + // in its function state store + // Check if request is outdated + if (!isRequestOutdated(updateRequest)) { + // update the function state + addFunctionToFunctionStateMap(updateRequestFs); + // check if this worker should run the update + if (isMyRequest(updateRequest)) { + // Update the function + updateFunction(updateRequestFs.getName()); + } + completeRequest(updateRequest, true); + } else { + completeRequest(updateRequest, false, + "Request ignored because it is out of date. Please try again."); + } + } + } + + private void addFunctionToFunctionStateMap(FunctionState functionState) { + if (!this.functionStateMap.containsKey(functionState.getTenant())) { + this.functionStateMap.put(functionState.getTenant(), new ConcurrentHashMap<>()); + } + + if (!this.functionStateMap.get(functionState.getTenant()).containsKey(functionState.getNamespace())) { + this.functionStateMap.get(functionState.getTenant()) + .put(functionState.getNamespace(), new ConcurrentHashMap<>()); + } + this.functionStateMap.get(functionState.getTenant()) + .get(functionState.getNamespace()).put(functionState.getName(), functionState); + } + + private boolean isRequestOutdated(ServiceRequest serviceRequest) { + FunctionState requestFunctionState = serviceRequest.getFunctionState(); + FunctionState currentFunctionState = this.functionStateMap.get(requestFunctionState.getTenant()) + .get(requestFunctionState.getNamespace()).get(requestFunctionState.getName()); + return currentFunctionState.getVersion() >= requestFunctionState.getVersion(); + } + + private boolean isMyRequest(ServiceRequest serviceRequest) { + return this.workerConfig.getWorkerId().equals(serviceRequest.getFunctionState().getWorkerId()); + } + + public void startFunction(String functionName) { + LOG.info("Starting function {}....", functionName); + } + + public void updateFunction(String functionName) { + LOG.info("Updating function {}...", functionName); + } + + public void stopFunction(String functionName) { + LOG.info("Stopping function {}...", functionName); + } +} diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/Utils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/Utils.java new file mode 100644 index 0000000000000..41fc8e15766b5 --- /dev/null +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/Utils.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.runtime.worker; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; + +public final class Utils { + + private Utils(){} + + public static Object getObject(byte[] byteArr) throws IOException, ClassNotFoundException { + Object obj = null; + ByteArrayInputStream bis = null; + ObjectInputStream ois = null; + try { + bis = new ByteArrayInputStream(byteArr); + ois = new ObjectInputStream(bis); + obj = ois.readObject(); + } finally { + if (bis != null) { + bis.close(); + } + if (ois != null) { + ois.close(); + } + } + return obj; + } + + public static byte[] toByteArray(Object obj) throws IOException { + byte[] bytes = null; + ByteArrayOutputStream bos = null; + ObjectOutputStream oos = null; + try { + bos = new ByteArrayOutputStream(); + oos = new ObjectOutputStream(bos); + oos.writeObject(obj); + oos.flush(); + bytes = bos.toByteArray(); + } finally { + if (oos != null) { + oos.close(); + } + if (bos != null) { + bos.close(); + } + } + return bytes; + } +} diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/Worker.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/Worker.java new file mode 100644 index 0000000000000..c6fbd084cf261 --- /dev/null +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/Worker.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.runtime.worker; + +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.functions.runtime.worker.rest.FunctionStateListener; +import org.apache.pulsar.functions.runtime.worker.rest.WorkerServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Worker { + + private static final Logger LOG = LoggerFactory.getLogger(Worker.class); + + + private WorkerConfig workerConfig; + private FunctionStateManager functionStateManager; + + public Worker(WorkerConfig workerConfig) throws PulsarClientException { + this.workerConfig = workerConfig; + this.functionStateManager = new FunctionStateManager(workerConfig); + } + + public void start() throws InterruptedException, PulsarClientException { + LOG.info("Start worker {}...", workerConfig.getWorkerId()); + + final WorkerServer workerServer = new WorkerServer(this.workerConfig, this.functionStateManager); + + FunctionStateListener functionStateListener = new FunctionStateListener(this.workerConfig, this.functionStateManager); + + Thread serverThread = new Thread(workerServer); + serverThread.setName(workerServer.getThreadName()); + Thread listenerThread = new Thread(functionStateListener); + listenerThread.setName(functionStateListener.getThreadName()); + + LOG.info("Start worker server on port {}...", workerConfig.getWorkerPort()); + serverThread.start(); + LOG.info("Start worker metadata listener..."); + listenerThread.start(); + + serverThread.join(); + listenerThread.join(); + } +} diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/WorkerConfig.java new file mode 100644 index 0000000000000..314064c288f12 --- /dev/null +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/WorkerConfig.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.runtime.worker; + +import java.net.URI; + +public class WorkerConfig { + + private String workerId; + private int workerPort; + private URI zookeeperUri; + private String functionMetadataTopic; + private String pulsarBrokerRootUrl; + private int numFunctionPackageReplicas; + + public int getNumFunctionPackageReplicas() { + return numFunctionPackageReplicas; + } + + public void setNumFunctionPackageReplicas(int numFunctionPackageReplicas) { + this.numFunctionPackageReplicas = numFunctionPackageReplicas; + } + + public String getFunctionMetadataTopicSubscription() { + if (this.workerId == null) { + throw new IllegalStateException("Worker Id is not set"); + } + return String.format("%s-subscription", this.workerId); + } + + public String getFunctionMetadataTopic() { + return functionMetadataTopic; + } + + public void setFunctionMetadataTopic(String functionMetadataTopic) { + this.functionMetadataTopic = functionMetadataTopic; + } + + public String getPulsarBrokerRootUrl() { + return pulsarBrokerRootUrl; + } + + public void setPulsarBrokerRootUrl(String pulsarBrokerRootUrl) { + this.pulsarBrokerRootUrl = pulsarBrokerRootUrl; + } + + public URI getZookeeperUri() { + return zookeeperUri; + } + + public void setZookeeperUri(URI zookeeperUri) { + this.zookeeperUri = zookeeperUri; + } + + public int getWorkerPort() { + return workerPort; + } + + public void setWorkerPort(int workerPort) { + this.workerPort = workerPort; + } + + public String getWorkerId() { + return workerId; + } + + public void setWorkerId(String workerId) { + this.workerId = workerId; + } +} diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/request/DeregisterRequest.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/request/DeregisterRequest.java new file mode 100644 index 0000000000000..3b4d8a59a31b8 --- /dev/null +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/request/DeregisterRequest.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.runtime.worker.request; + +import org.apache.pulsar.functions.runtime.worker.FunctionState; + +public class DeregisterRequest extends ServiceRequest{ + + private String functionName; + + public DeregisterRequest(String workerId, FunctionState functionState) { + super(workerId, functionState, ServiceRequestType.DELETE); + this.functionName = functionName; + } + + public static DeregisterRequest of(String workerId, FunctionState functionState) { + return new DeregisterRequest(workerId, functionState); + } + + @Override + public String toString() { + return "DeregisterRequest{" + + super.toString() + + "}"; + } +} diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/request/RequestResult.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/request/RequestResult.java new file mode 100644 index 0000000000000..c9333590f7bb4 --- /dev/null +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/request/RequestResult.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.runtime.worker.request; + +import com.google.gson.Gson; + +public class RequestResult { + private boolean success; + private String message; + + private ServiceRequest requestDetails; + + public boolean isSuccess() { + return success; + } + + public void setSuccess(boolean success) { + this.success = success; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + public ServiceRequest getRequestDetails() { + return requestDetails; + } + + public void setRequestDetails(ServiceRequest requestDetails) { + this.requestDetails = requestDetails; + } + + public String toJson() { + return new Gson().toJson(this); + } +} diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/request/ServiceRequest.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/request/ServiceRequest.java new file mode 100644 index 0000000000000..cb4aa0774e003 --- /dev/null +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/request/ServiceRequest.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.runtime.worker.request; + +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.functions.runtime.worker.FunctionState; + +import java.io.Serializable; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; + +public abstract class ServiceRequest implements Serializable{ + + private String requestId; + private ServiceRequestType serviceRequestType; + private String workerId; + private FunctionState functionState; + transient private CompletableFuture completableFutureRequestMessageId; + transient private CompletableFuture requestResultCompletableFuture; + + @Override + public String toString() { + return "ServiceRequest{" + + "requestId='" + requestId + '\'' + + ", serviceRequestType=" + serviceRequestType + + ", workerId='" + workerId + '\'' + + ", functionState=" + functionState + + '}'; + } + + public enum ServiceRequestType { + UPDATE, + DELETE + } + + public ServiceRequest(String workerId, FunctionState functionState, ServiceRequestType serviceRequestType) { + this(workerId, functionState, serviceRequestType, UUID.randomUUID().toString()); + } + + public ServiceRequest(String workerId, FunctionState functionState, + ServiceRequestType serviceRequestType, String requestId) { + this.workerId = workerId; + this.functionState = functionState; + this.serviceRequestType = serviceRequestType; + this.requestId = requestId; + } + + public ServiceRequestType getRequestType() { + return this.serviceRequestType; + } + + public String getWorkerId() { + return this.workerId; + } + + public FunctionState getFunctionState() { + return functionState; + } + + public String getRequestId() { + return requestId; + } + + public void setRequestId(String requestId) { + this.requestId = requestId; + } + + public CompletableFuture getRequestResultCompletableFuture() { + return requestResultCompletableFuture; + } + + public void setRequestResultCompletableFuture(CompletableFuture requestResultCompletableFuture) { + this.requestResultCompletableFuture = requestResultCompletableFuture; + } + + public CompletableFuture getCompletableFutureRequestMessageId() { + return completableFutureRequestMessageId; + } + + public void setCompletableFutureRequestMessageId(CompletableFuture completableFutureRequestMessageId) { + this.completableFutureRequestMessageId = completableFutureRequestMessageId; + } + + public boolean isValidRequest() { + return this.requestId != null && this.serviceRequestType != null + && this.workerId != null && this.functionState != null; + } +} diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/request/ServiceRequestManager.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/request/ServiceRequestManager.java new file mode 100644 index 0000000000000..a13d3300ded9e --- /dev/null +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/request/ServiceRequestManager.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.runtime.worker.request; + +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.functions.runtime.worker.Utils; +import org.apache.pulsar.functions.runtime.worker.WorkerConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +public class ServiceRequestManager { + + private static final Logger LOG = LoggerFactory.getLogger(ServiceRequestManager.class); + + PulsarClient client; + Producer producer; + + public ServiceRequestManager(WorkerConfig workerConfig) throws PulsarClientException { + String pulsarBrokerRootUrl = workerConfig.getPulsarBrokerRootUrl(); + client = PulsarClient.create(pulsarBrokerRootUrl); + String topic = workerConfig.getFunctionMetadataTopic(); + + producer = client.createProducer(topic); + } + + public CompletableFuture submitRequest(ServiceRequest serviceRequest) { + LOG.debug("Submitting Service Request: {}", serviceRequest); + byte[] bytes; + try { + bytes = Utils.toByteArray(serviceRequest); + } catch (IOException e) { + LOG.error("error serializing request: " + serviceRequest); + throw new RuntimeException(e); + } + + CompletableFuture messageIdCompletableFuture = send(bytes); + + return messageIdCompletableFuture; + } + + public CompletableFuture send(byte[] message) { + CompletableFuture messageIdCompletableFuture = producer.sendAsync(message); + + return messageIdCompletableFuture; + } +} diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/request/UpdateRequest.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/request/UpdateRequest.java new file mode 100644 index 0000000000000..0759848df8a39 --- /dev/null +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/request/UpdateRequest.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.runtime.worker.request; + +import org.apache.pulsar.functions.runtime.worker.FunctionState; + +public class UpdateRequest extends ServiceRequest { + + private UpdateRequest(String workerId, FunctionState functionState) { + super(workerId, functionState, ServiceRequestType.UPDATE); + } + + public static UpdateRequest of(String workerId, FunctionState functionState) { + return new UpdateRequest(workerId, functionState); + } + + @Override + public String toString() { + return "UpdateRequest{" + + super.toString() + + "}"; + } +} diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/rest/BaseApiResource.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/rest/BaseApiResource.java new file mode 100644 index 0000000000000..597decc39343f --- /dev/null +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/rest/BaseApiResource.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.runtime.worker.rest; + +import org.apache.pulsar.functions.runtime.worker.FunctionStateManager; +import org.apache.pulsar.functions.runtime.worker.request.ServiceRequestManager; +import org.apache.pulsar.functions.runtime.worker.WorkerConfig; + +import javax.servlet.ServletContext; +import javax.ws.rs.core.Context; + +public class BaseApiResource { + + public static final String ATTRIBUTE_WORKER_CONFIG = "config"; + public static final String ATTRIBUTE_WORKER_FUNCTION_STATE_MANAGER = "function-state-manager"; + public static final String ATTRIBUTE_WORKER_SERVICE_REQUEST_MANAGER = "worker-service-request-manager"; + + private WorkerConfig workerConfig; + private FunctionStateManager functionStateManager; + private ServiceRequestManager serviceRequestManager; + + @Context + protected ServletContext servletContext; + + public WorkerConfig getWorkerConfig() { + if (this.workerConfig == null) { + this.workerConfig = (WorkerConfig) servletContext.getAttribute(ATTRIBUTE_WORKER_CONFIG); + } + return this.workerConfig; + } + + public FunctionStateManager getWorkerFunctionStateManager() { + if (this.functionStateManager == null) { + this.functionStateManager = (FunctionStateManager) servletContext.getAttribute(ATTRIBUTE_WORKER_FUNCTION_STATE_MANAGER); + } + return this.functionStateManager; + } + + public ServiceRequestManager getWorkerServiceRequestManager() { + if (this.serviceRequestManager == null) { + this.serviceRequestManager = (ServiceRequestManager) servletContext.getAttribute(ATTRIBUTE_WORKER_SERVICE_REQUEST_MANAGER); + } + return this.serviceRequestManager; + } +} diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/rest/ConfigurationResource.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/rest/ConfigurationResource.java new file mode 100644 index 0000000000000..cd48cbd963f40 --- /dev/null +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/rest/ConfigurationResource.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.runtime.worker.rest; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +@Path("/") +public class ConfigurationResource { + @Path("version") + @GET + @Produces(MediaType.APPLICATION_JSON) + public Response release() throws JsonProcessingException { + final ObjectMapper mapper = new ObjectMapper(); + final ObjectNode node = mapper.createObjectNode(); + node.put("version", "version.number"); + + return Response.ok() + .type(MediaType.APPLICATION_JSON) + .entity(mapper + .writerWithDefaultPrettyPrinter() + .writeValueAsString(node)) + .build(); + } +} diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/rest/FunctionStateListener.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/rest/FunctionStateListener.java new file mode 100644 index 0000000000000..e58d1cb901703 --- /dev/null +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/rest/FunctionStateListener.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.runtime.worker.rest; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.functions.runtime.worker.FunctionStateManager; +import org.apache.pulsar.functions.runtime.worker.Utils; +import org.apache.pulsar.functions.runtime.worker.WorkerConfig; +import org.apache.pulsar.functions.runtime.worker.request.DeregisterRequest; +import org.apache.pulsar.functions.runtime.worker.request.ServiceRequest; +import org.apache.pulsar.functions.runtime.worker.request.UpdateRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class FunctionStateListener implements Runnable{ + + private static final Logger LOG = LoggerFactory.getLogger(FunctionStateListener.class); + + PulsarClient client; + Consumer consumer; + + private final WorkerConfig workerConfig; + private final FunctionStateManager functionStateManager; + + public FunctionStateListener(WorkerConfig workerConfig, FunctionStateManager functionStateManager) throws PulsarClientException { + this.workerConfig = workerConfig; + this.functionStateManager = functionStateManager; + this.client = PulsarClient.create(workerConfig.getPulsarBrokerRootUrl()); + this.consumer = client.subscribe(workerConfig.getFunctionMetadataTopic(), workerConfig.getFunctionMetadataTopicSubscription()); + } + + /** + * Listens for message from the FMT + */ + @Override + public void run() { + try { + while (true) { + // Wait for a message + Message msg = consumer.receive(); + + try { + ServiceRequest serviceRequest = (ServiceRequest) Utils.getObject(msg.getData()); + LOG.debug("Received Service Request: {}", serviceRequest); + + switch(serviceRequest.getRequestType()) { + case UPDATE: + this.functionStateManager.processUpdate((UpdateRequest) serviceRequest); + break; + case DELETE: + this.functionStateManager.proccessDeregister((DeregisterRequest) serviceRequest); + break; + default: + LOG.warn("Received request with unrecognized type: {}", serviceRequest); + } + } catch (IOException | ClassNotFoundException e) { + LOG.error("Error occured at listener: {}", e.getMessage(), e); + } + + // Acknowledge the message so that it can be deleted by broker + consumer.acknowledgeAsync(msg); + } + } catch (PulsarClientException e) { + LOG.error("Error receiving message from pulsar consumer", e); + } + } + + public String getThreadName() { + return "worker-listener-thread-" + this.workerConfig.getWorkerId(); + } +} diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/rest/Resources.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/rest/Resources.java new file mode 100644 index 0000000000000..f21b68d01c7a0 --- /dev/null +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/rest/Resources.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.runtime.worker.rest; + +import org.apache.pulsar.functions.runtime.worker.rest.api.v1.ApiV1Resource; +import org.glassfish.jersey.media.multipart.MultiPartFeature; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public final class Resources { + + private Resources() { + } + + static Set> get() { + return new HashSet<>(getClasses()); + } + + private static List> getClasses() { + return Arrays.asList( + ConfigurationResource.class, + ApiV1Resource.class, + MultiPartFeature.class + ); + } +} \ No newline at end of file diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/rest/RestUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/rest/RestUtils.java new file mode 100644 index 0000000000000..33c6f598ec556 --- /dev/null +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/rest/RestUtils.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.runtime.worker.rest; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import java.util.List; + +public class RestUtils { + private RestUtils() { + + } + + public static ObjectNode createBaseMessage(String message) { + final ObjectMapper mapper = new ObjectMapper(); + return mapper.createObjectNode().put("message", message); + } + + public static String createMessage(String message) { + return createBaseMessage(message).toString(); + } + + public static String createValidationError(String message, List missing) { + ObjectNode node = createBaseMessage(message); + ObjectNode errors = node.putObject("errors"); + ArrayNode missingParameters = errors.putArray("missing_parameters"); + for (String param : missing) { + missingParameters.add(param); + } + + return node.toString(); + } + + public static class Message { + + public static class MessageBuilder { + ObjectNode objectNode = new ObjectMapper().createObjectNode(); + public MessageBuilder add(String key, String value) { + objectNode.put(key, value); + return this; + } + + public String build() { + return this.objectNode.toString(); + } + } + + public static MessageBuilder newBuilder() { + return new MessageBuilder(); + } + } +} diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/rest/WorkerServer.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/rest/WorkerServer.java new file mode 100644 index 0000000000000..326af42b7cbac --- /dev/null +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/rest/WorkerServer.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.runtime.worker.rest; + +import org.apache.pulsar.functions.runtime.worker.FunctionStateManager; +import org.apache.pulsar.functions.runtime.worker.WorkerConfig; +import org.apache.pulsar.functions.runtime.worker.request.ServiceRequestManager; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.glassfish.jersey.server.ResourceConfig; +import org.glassfish.jersey.servlet.ServletContainer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.BindException; +import java.net.URI; + +public class WorkerServer implements Runnable{ + + private static final Logger LOG = LoggerFactory.getLogger(WorkerServer.class); + + private WorkerConfig workerConfig; + private FunctionStateManager functionStateManager; + + + public WorkerServer(WorkerConfig workerConfig, FunctionStateManager functionStateManager) { + this.workerConfig = workerConfig; + this.functionStateManager = functionStateManager; + } + + private static String getErrorMessage(Server server, int port, Exception ex) { + if (ex instanceof BindException) { + final URI uri = server.getURI(); + return String.format("%s http://%s:%d", ex.getMessage(), uri.getHost(), port); + } + + return ex.getMessage(); + } + + @Override + public void run() { + final Server server = new Server(this.workerConfig.getWorkerPort()); + + final ResourceConfig config = new ResourceConfig(Resources.get()); + + final ServletContextHandler contextHandler = + new ServletContextHandler(ServletContextHandler.NO_SESSIONS); + + contextHandler.setAttribute(BaseApiResource.ATTRIBUTE_WORKER_CONFIG, this.workerConfig); + contextHandler.setAttribute(BaseApiResource.ATTRIBUTE_WORKER_FUNCTION_STATE_MANAGER, this.functionStateManager); + contextHandler.setContextPath("/"); + + server.setHandler(contextHandler); + + final ServletHolder apiServlet = + new ServletHolder(new ServletContainer(config)); + + contextHandler.addServlet(apiServlet, "/*"); + try { + server.start(); + + LOG.info("Worker Server started at {}", server.getURI()); + + server.join(); + } catch (Exception ex) { + final String message = getErrorMessage(server, this.workerConfig.getWorkerPort(), ex); + LOG.error(message); + System.exit(1); + } finally { + server.destroy(); + } + } + + public String getThreadName() { + return "worker-server-thread-" + this.workerConfig.getWorkerId(); + } +} diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/rest/api/v1/ApiV1Resource.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/rest/api/v1/ApiV1Resource.java new file mode 100644 index 0000000000000..252eccac67e23 --- /dev/null +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/rest/api/v1/ApiV1Resource.java @@ -0,0 +1,415 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.runtime.worker.rest.api.v1; + +import com.google.gson.Gson; +import org.apache.pulsar.functions.runtime.worker.FunctionState; +import org.apache.pulsar.functions.runtime.worker.FunctionStateManager; +import org.apache.pulsar.functions.runtime.worker.Worker; +import org.apache.pulsar.functions.runtime.worker.request.RequestResult; +import org.apache.pulsar.functions.runtime.worker.WorkerConfig; +import org.apache.pulsar.functions.runtime.worker.rest.BaseApiResource; +import org.apache.pulsar.functions.runtime.worker.rest.RestUtils; +import org.glassfish.jersey.media.multipart.FormDataContentDisposition; +import org.glassfish.jersey.media.multipart.FormDataParam; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +@Path("/admin/functions") +public class ApiV1Resource extends BaseApiResource { + + private static final Logger LOG = LoggerFactory.getLogger(ApiV1Resource.class); + + @POST + @Path("/{tenant}/{namespace}/{functionName}") + @Consumes(MediaType.MULTIPART_FORM_DATA) + public Response registerFunction(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("functionName") String functionName, + final @FormDataParam("data") InputStream uploadedInputStream, + final @FormDataParam("data") FormDataContentDisposition fileDetail, + final @FormDataParam("sinkTopic") String sinkTopic, + final @FormDataParam("sourceTopic") String sourceTopic, + final @FormDataParam("inputSerdeClassName") String inputSerdeClassName, + final @FormDataParam("outputSerdeClassName") String outputSerdeClassName, + final @FormDataParam("className") String className) { + + // validate parameters + try { + validateRegisterRequestParams(tenant, namespace, functionName, + uploadedInputStream, fileDetail, sinkTopic, sourceTopic, + inputSerdeClassName, outputSerdeClassName, className); + } catch (IllegalArgumentException e) { + return Response.status(Response.Status.BAD_REQUEST) + .type(MediaType.APPLICATION_JSON) + .entity(RestUtils.createMessage(e.getMessage())).build(); + } + + FunctionStateManager functionStateManager = getWorkerFunctionStateManager(); + + if (functionStateManager.containsFunction(tenant, namespace, functionName)) { + return Response.status(Response.Status.BAD_REQUEST) + .type(MediaType.APPLICATION_JSON) + .entity(RestUtils.createMessage(String.format("Function %s already exist", functionName))).build(); + } + + WorkerConfig workerConfig = getWorkerConfig(); + FunctionState functionState = new FunctionState(); + functionState.setCreateTime(System.currentTimeMillis()); + functionState.setName(functionName); + functionState.setNamespace(namespace); + functionState.setTenant(tenant); + functionState.setPackageLocation( + Utils.getPackageURI( + Utils.getDestPackageNamespaceURI(workerConfig, namespace), + fileDetail.getFileName() + ).toString() + ); + + functionState.setSourceTopic(sourceTopic); + functionState.setSinkTopic(sinkTopic); + functionState.setVersion(0); + functionState.setWorkerId(workerConfig.getWorkerId()); + functionState.setInputSerdeClassName(inputSerdeClassName); + functionState.setOutputSerdeClassName(outputSerdeClassName); + functionState.setClassName(className); + + return updateRequest(functionState, uploadedInputStream, fileDetail); + } + + @PUT + @Path("/{tenant}/{namespace}/{functionName}") + @Consumes(MediaType.MULTIPART_FORM_DATA) + public Response updateFunction(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("functionName") String functionName, + final @FormDataParam("data") InputStream uploadedInputStream, + final @FormDataParam("data") FormDataContentDisposition fileDetail, + final @FormDataParam("sinkTopic") String sinkTopic, + final @FormDataParam("sourceTopic") String sourceTopic, + final @FormDataParam("inputSerdeClassName") String inputSerdeClassName, + final @FormDataParam("outputSerdeClassName") String outputSerdeClassName, + final @FormDataParam("className") String className) { + + // validate parameters + try { + validateUpdateRequestParams(tenant, namespace, functionName, + uploadedInputStream, fileDetail, sinkTopic, sourceTopic, + inputSerdeClassName, outputSerdeClassName, className); + } catch (IllegalArgumentException e) { + return Response.status(Response.Status.BAD_REQUEST) + .type(MediaType.APPLICATION_JSON) + .entity(RestUtils.createMessage(e.getMessage())).build(); + } + + FunctionStateManager functionStateManager = getWorkerFunctionStateManager(); + + if (!functionStateManager.containsFunction(tenant, namespace, functionName)) { + return Response.status(Response.Status.BAD_REQUEST) + .type(MediaType.APPLICATION_JSON) + .entity(RestUtils.createMessage(String.format("Function %s doesn't exist", functionName))).build(); + } + + WorkerConfig workerConfig = getWorkerConfig(); + FunctionState functionState = new FunctionState(); + functionState.setCreateTime(System.currentTimeMillis()); + functionState.setName(functionName); + functionState.setNamespace(namespace); + functionState.setTenant(tenant); + functionState.setPackageLocation( + Utils.getPackageURI( + Utils.getDestPackageNamespaceURI(workerConfig, namespace), + fileDetail.getFileName() + ).toString() + ); + + functionState.setSourceTopic(sourceTopic); + functionState.setSinkTopic(sinkTopic); + functionState.setVersion(0); + functionState.setWorkerId(workerConfig.getWorkerId()); + functionState.setInputSerdeClassName(inputSerdeClassName); + functionState.setOutputSerdeClassName(outputSerdeClassName); + functionState.setClassName(className); + + return updateRequest(functionState, uploadedInputStream, fileDetail); + } + + + @DELETE + @Path("/{tenant}/{namespace}/{functionName}") + public Response deregisterFunction(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("functionName") String functionName) { + + // validate parameters + try { + validateDeregisterRequestParams(tenant, namespace, functionName); + } catch (IllegalArgumentException e) { + return Response.status(Response.Status.BAD_REQUEST) + .type(MediaType.APPLICATION_JSON) + .entity(RestUtils.createMessage(e.getMessage())).build(); + } + + FunctionStateManager functionStateManager = getWorkerFunctionStateManager(); + if (!functionStateManager.containsFunction(tenant, namespace, functionName)) { + return Response.status(Response.Status.BAD_REQUEST) + .type(MediaType.APPLICATION_JSON) + .entity(RestUtils.createMessage(String.format("Function %s doesn't exist", functionName))).build(); + } + + CompletableFuture completableFuture + = functionStateManager.deregisterFunction(tenant, namespace, functionName); + + RequestResult requestResult = null; + try { + requestResult = completableFuture.get(); + if (!requestResult.isSuccess()) { + return Response.status(Response.Status.BAD_REQUEST) + .type(MediaType.APPLICATION_JSON) + .entity(requestResult.toJson()) + .build(); + } + } catch (InterruptedException | ExecutionException e) { + LOG.error("Error completeing request", e); + return Response.serverError() + .type(MediaType.APPLICATION_JSON) + .entity(RestUtils.createMessage(e.getMessage())) + .build(); + } + + return Response.status(Response.Status.OK).entity(requestResult.toJson()).build(); + } + + @GET + @Path("/{tenant}/{namespace}/{functionName}") + public Response getFunctionInfo(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("functionName") String functionName) { + + // validate parameters + try { + validateGetFunctionRequestParams(tenant, namespace, functionName); + } catch (IllegalArgumentException e) { + return Response.status(Response.Status.BAD_REQUEST) + .type(MediaType.APPLICATION_JSON) + .entity(RestUtils.createMessage(e.getMessage())).build(); + } + + FunctionStateManager functionStateManager = getWorkerFunctionStateManager(); + if (!functionStateManager.containsFunction(tenant, namespace, functionName)) { + return Response.status(Response.Status.BAD_REQUEST) + .type(MediaType.APPLICATION_JSON) + .entity(RestUtils.createMessage(String.format("Function %s doesn't exist", functionName))).build(); + } + + FunctionState functionState = functionStateManager.getFunction(tenant, namespace, functionName); + return Response.status(Response.Status.OK).entity(functionState.toJson()).build(); + } + + @GET + @Path("/{tenant}/{namespace}") + public Response listFunctions(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace) { + + // validate parameters + try { + validateListFunctionRequestParams(tenant, namespace); + } catch (IllegalArgumentException e) { + return Response.status(Response.Status.BAD_REQUEST) + .type(MediaType.APPLICATION_JSON) + .entity(RestUtils.createMessage(e.getMessage())).build(); + } + + FunctionStateManager functionStateManager = getWorkerFunctionStateManager(); + + Collection functionStateList = functionStateManager.listFunction(tenant, namespace); + + return Response.status(Response.Status.OK).entity(new Gson().toJson(functionStateList.toArray())).build(); + } + + private Response updateRequest(FunctionState functionState, + InputStream uploadedInputStream, + FormDataContentDisposition fileDetail) { + WorkerConfig workerConfig = getWorkerConfig(); + + // Submit to FMT + FunctionStateManager functionStateManager = getWorkerFunctionStateManager(); + + CompletableFuture completableFuture + = functionStateManager.updateFunction(functionState); + + RequestResult requestResult = null; + try { + requestResult = completableFuture.get(); + if (!requestResult.isSuccess()) { + return Response.status(Response.Status.BAD_REQUEST) + .type(MediaType.APPLICATION_JSON) + .entity(requestResult.toJson()) + .build(); + } + } catch (InterruptedException | ExecutionException e) { + LOG.error("Error completeing request", e); + return Response.serverError() + .type(MediaType.APPLICATION_JSON) + .entity(RestUtils.createMessage(e.getMessage())) + .build(); + } + + // Upload to bookeeper + URI packageURI = null; + try { + packageURI = Utils.uploadToBookeeper(uploadedInputStream, fileDetail, + functionState.getNamespace(), workerConfig); + } catch (IOException e) { + LOG.error("Error uploading file {}", fileDetail.getFileName(), e); + return Response.serverError() + .type(MediaType.APPLICATION_JSON) + .entity(RestUtils.createMessage(e.getMessage())) + .build(); + } + + return Response.status(Response.Status.OK).entity(requestResult.toJson()).build(); + } + + private void validateListFunctionRequestParams(String tenant, String namespace) throws IllegalArgumentException { + + if (tenant == null) { + throw new IllegalArgumentException("Tenant is not provided"); + } + if (namespace == null) { + throw new IllegalArgumentException("Namespace is not provided"); + } + } + + private void validateGetFunctionRequestParams(String tenant, + String namespace, + String functionName) throws IllegalArgumentException { + + if (tenant == null) { + throw new IllegalArgumentException("Tenant is not provided"); + } + if (namespace == null) { + throw new IllegalArgumentException("Namespace is not provided"); + } + if (functionName == null) { + throw new IllegalArgumentException("Function Name is not provided"); + } + } + + private void validateDeregisterRequestParams(String tenant, + String namespace, + String functionName) throws IllegalArgumentException { + + if (tenant == null) { + throw new IllegalArgumentException("Tenant is not provided"); + } + if (namespace == null) { + throw new IllegalArgumentException("Namespace is not provided"); + } + if (functionName == null) { + throw new IllegalArgumentException("Function Name is not provided"); + } + } + + private void validateRegisterRequestParams(String tenant, + String namespace, + String functionName, + InputStream uploadedInputStream, + FormDataContentDisposition fileDetail, + String sinkTopic, + String inputTopic, + String inputSerdeClassName, + String outputSerdeClassName, + String className) throws IllegalArgumentException { + + if (tenant == null) { + throw new IllegalArgumentException("Tenant is not provided"); + } + if (namespace == null) { + throw new IllegalArgumentException("Namespace is not provided"); + } + if (functionName == null) { + throw new IllegalArgumentException("Function Name is not provided"); + } + if (uploadedInputStream == null || fileDetail == null) { + throw new IllegalArgumentException("Function package not provided"); + } + if (inputTopic == null) { + throw new IllegalArgumentException("Input Topic is not provided"); + } + if (inputSerdeClassName == null) { + throw new IllegalArgumentException("inputSerdeClassName is not provided"); + } + if (outputSerdeClassName == null) { + throw new IllegalArgumentException("outputSerdeClassName is not provided"); + } + if (className == null) { + throw new IllegalArgumentException("className is not provided"); + } + if (!Utils.namespaceExists(namespace, getWorkerConfig())) { + throw new IllegalArgumentException(String.format("Namespace %s doesn't exist", namespace)); + } + } + + private void validateUpdateRequestParams(String tenant, + String namespace, + String functionName, + InputStream uploadedInputStream, + FormDataContentDisposition fileDetail, + String sinkTopic, + String inputTopic, + String inputSerdeClassName, + String outputSerdeClassName, + String className) throws IllegalArgumentException { + + if (tenant == null) { + throw new IllegalArgumentException("Tenant is not provided"); + } + if (namespace == null) { + throw new IllegalArgumentException("Namespace is not provided"); + } + if (functionName == null) { + throw new IllegalArgumentException("Function Name is not provided"); + } + if (uploadedInputStream == null && fileDetail == null && sinkTopic == null && inputTopic == null + && inputSerdeClassName == null && outputSerdeClassName == null && className == null) { + throw new IllegalArgumentException("No updates found"); + } + + if (!Utils.namespaceExists(namespace, getWorkerConfig())) { + throw new IllegalArgumentException(String.format("Namespace %s doesn't exist", namespace)); + } + } +} diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/rest/api/v1/Utils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/rest/api/v1/Utils.java new file mode 100644 index 0000000000000..6da2cfb82a8dc --- /dev/null +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/rest/api/v1/Utils.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.runtime.worker.rest.api.v1; + +import org.apache.distributedlog.AppendOnlyStreamWriter; +import org.apache.distributedlog.DistributedLogConfiguration; +import org.apache.distributedlog.api.DistributedLogManager; +import org.apache.distributedlog.api.namespace.Namespace; +import org.apache.distributedlog.api.namespace.NamespaceBuilder; +import org.apache.pulsar.functions.runtime.worker.WorkerConfig; +import org.apache.pulsar.functions.runtime.worker.request.RequestResult; +import org.apache.pulsar.functions.runtime.worker.rest.RestUtils; +import org.apache.pulsar.functions.runtime.worker.rest.api.v1.dlog.DLOutputStream; +import org.glassfish.jersey.media.multipart.FormDataContentDisposition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; + +public final class Utils { + + private static final Logger LOG = LoggerFactory.getLogger(Utils.class); + + + private Utils(){} + + static boolean namespaceExists(String namespace, WorkerConfig workerConfig) { + + String zookeeperHost = workerConfig.getZookeeperUri().getHost(); + int zookeeperPort = workerConfig.getZookeeperUri().getPort(); + String destTopologyNamespaceURI = String.format("distributedlog://%s:%d/%s", zookeeperHost, zookeeperPort, namespace); + + URI uri = URI.create(destTopologyNamespaceURI); + + try { + NamespaceBuilder.newBuilder().clientId("pulsar-functions-uploader").conf(getDlogConf(workerConfig)).uri(uri).build(); + } catch (IOException e) { + return false; + } + return true; + } + + static String getDestPackageNamespaceURI(WorkerConfig workerConfig, String namespace) { + String zookeeperHost = workerConfig.getZookeeperUri().getHost(); + int zookeeperPort = workerConfig.getZookeeperUri().getPort(); + return String.format("distributedlog://%s:%d/%s", zookeeperHost, zookeeperPort, namespace); + } + + static URI getPackageURI(String destPackageNamespaceURI, String packageName) { + return URI.create(String.format("%s/%s", destPackageNamespaceURI, packageName)); + } + + static URI uploadToBookeeper(InputStream uploadedInputStream, + FormDataContentDisposition fileDetail, + String namespace, WorkerConfig workerConfig) + throws IOException { + String packageName = fileDetail.getFileName(); + String destPackageNamespaceURI = getDestPackageNamespaceURI(workerConfig, namespace); + URI packageURI = getPackageURI(destPackageNamespaceURI, packageName); + + DistributedLogConfiguration conf = getDlogConf(workerConfig); + + URI uri = URI.create(destPackageNamespaceURI); + + Namespace dlogNamespace = null; + dlogNamespace = NamespaceBuilder.newBuilder() + .clientId("pulsar-functions-uploader").conf(conf).uri(uri).build(); + + // if the dest directory does not exist, create it. + DistributedLogManager dlm = null; + AppendOnlyStreamWriter writer = null; + + if (dlogNamespace.logExists(packageName)) { + // if the destination file exists, write a log message + LOG.info(String.format("Target function file already exists at '%s'. Overwriting it now", + packageURI.toString())); + dlogNamespace.deleteLog(packageName); + } + // copy the topology package to target working directory + LOG.info(String.format("Uploading function package '%s' to target DL at '%s'", + fileDetail.getName(), packageURI.toString())); + + + dlm = dlogNamespace.openLog(fileDetail.getFileName()); + writer = dlm.getAppendOnlyStreamWriter(); + + try (OutputStream out = new DLOutputStream(dlm, writer)) { + int read = 0; + byte[] bytes = new byte[1024]; + while ((read = uploadedInputStream.read(bytes)) != -1) { + out.write(bytes, 0, read); + } + out.flush(); + } + return packageURI; + } + + static DistributedLogConfiguration getDlogConf(WorkerConfig workerConfig) { + int numReplicas = workerConfig.getNumFunctionPackageReplicas(); + + return new DistributedLogConfiguration() + .setWriteLockEnabled(false) + .setOutputBufferSize(256 * 1024) // 256k + .setPeriodicFlushFrequencyMilliSeconds(0) // disable periodical flush + .setImmediateFlushEnabled(false) // disable immediate flush + .setLogSegmentRollingIntervalMinutes(0) // disable time-based rolling + .setMaxLogSegmentBytes(Long.MAX_VALUE) // disable size-based rolling + .setExplicitTruncationByApplication(true) // no auto-truncation + .setRetentionPeriodHours(Integer.MAX_VALUE) // long retention + .setEnsembleSize(numReplicas) // replica settings + .setWriteQuorumSize(numReplicas) + .setAckQuorumSize(numReplicas) + .setUseDaemonThread(true); + } +} diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/rest/api/v1/dlog/DLInputStream.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/rest/api/v1/dlog/DLInputStream.java new file mode 100644 index 0000000000000..d90692c840b76 --- /dev/null +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/rest/api/v1/dlog/DLInputStream.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.runtime.worker.rest.api.v1.dlog; + +import org.apache.distributedlog.DLSN; +import org.apache.distributedlog.LogRecordWithDLSN; +import org.apache.distributedlog.api.DistributedLogManager; +import org.apache.distributedlog.api.LogReader; +import org.apache.distributedlog.exceptions.EndOfStreamException; + +import java.io.IOException; +import java.io.InputStream; + +public class DLInputStream extends InputStream { + + private LogRecordWithInputStream currentLogRecord = null; + private final DistributedLogManager dlm; + private LogReader reader; + private boolean eos = false; + + // Cache the input stream for a log record. + private static class LogRecordWithInputStream { + private final InputStream payloadStream; + private final LogRecordWithDLSN logRecord; + + LogRecordWithInputStream(LogRecordWithDLSN logRecord) { + this.logRecord = logRecord; + this.payloadStream = logRecord.getPayLoadInputStream(); + } + + InputStream getPayLoadInputStream() { + return payloadStream; + } + + LogRecordWithDLSN getLogRecord() { + return logRecord; + } + + // The last txid of the log record is the position of the next byte in the stream. + // Subtract length to get starting offset. + long getOffset() { + return logRecord.getTransactionId() - logRecord.getPayload().length; + } + } + + /** + * Construct distributedlog input stream + * + * @param dlm the Distributed Log Manager to access the stream + */ + public DLInputStream(DistributedLogManager dlm) throws IOException { + this.dlm = dlm; + reader = dlm.getInputStream(DLSN.InitialDLSN); + } + + /** + * Get input stream representing next entry in the + * ledger. + * + * @return input stream, or null if no more entries + */ + private LogRecordWithInputStream nextLogRecord() throws IOException { + try { + return nextLogRecord(reader); + } catch (EndOfStreamException e) { + eos = true; + return null; + } + } + + private static LogRecordWithInputStream nextLogRecord(LogReader reader) throws IOException { + LogRecordWithDLSN record = reader.readNext(false); + + if (null != record) { + return new LogRecordWithInputStream(record); + } else { + record = reader.readNext(false); + if (null != record) { + return new LogRecordWithInputStream(record); + } else { + return null; + } + } + } + + @Override + public int read() throws IOException { + byte[] b = new byte[1]; + if (read(b, 0, 1) != 1) { + return -1; + } else { + return b[0]; + } + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (eos) { + return -1; + } + + int read = 0; + if (currentLogRecord == null) { + currentLogRecord = nextLogRecord(); + if (currentLogRecord == null) { + return read; + } + } + + while (read < len) { + int thisread = currentLogRecord.getPayLoadInputStream().read(b, off + read, len - read); + if (thisread == -1) { + currentLogRecord = nextLogRecord(); + if (currentLogRecord == null) { + return read; + } + } else { + read += thisread; + } + } + return read; + } + + @Override + public void close() throws IOException { + reader.close(); + dlm.close(); + } +} diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/rest/api/v1/dlog/DLOutputStream.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/rest/api/v1/dlog/DLOutputStream.java new file mode 100644 index 0000000000000..876070594b8c9 --- /dev/null +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/worker/rest/api/v1/dlog/DLOutputStream.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.runtime.worker.rest.api.v1.dlog; + +import org.apache.distributedlog.AppendOnlyStreamWriter; +import org.apache.distributedlog.api.DistributedLogManager; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * DistributedLog Output Stream. + */ +public class DLOutputStream extends OutputStream { + + private final DistributedLogManager dlm; + private final AppendOnlyStreamWriter writer; + + public DLOutputStream(DistributedLogManager dlm, + AppendOnlyStreamWriter writer) { + this.dlm = dlm; + this.writer = writer; + } + + @Override + public void write(int b) throws IOException { + byte[] data = new byte[] { + (byte) b + }; + write(data); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + // TODO: avoid array copy by using the new bytebuf api + byte[] newData = new byte[len]; + System.arraycopy(b, off, newData, 0, len); + write(newData); + } + + @Override + public void write(byte[] b) throws IOException { + writer.write(b); + } + + @Override + public void flush() throws IOException { + writer.force(false); + } + + @Override + public void close() throws IOException { + writer.markEndOfStream(); + writer.close(); + dlm.close(); + } +} diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/worker/FunctionStateManagerTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/worker/FunctionStateManagerTest.java new file mode 100644 index 0000000000000..1d6e2a4c02262 --- /dev/null +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/worker/FunctionStateManagerTest.java @@ -0,0 +1,4 @@ +package org.apache.pulsar.functions.runtime.worker; + +public class FunctionStateManagerTest { +} diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/worker/WorkerTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/worker/WorkerTest.java new file mode 100644 index 0000000000000..123fdec8f5370 --- /dev/null +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/worker/WorkerTest.java @@ -0,0 +1,61 @@ +package org.apache.pulsar.functions.runtime.worker; + +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.functions.runtime.worker.Worker; +import org.apache.pulsar.functions.runtime.worker.WorkerConfig; +import org.apache.pulsar.functions.runtime.worker.rest.WorkerServer; +import org.junit.Test; + +import java.net.URI; +import java.net.URISyntaxException; + +public class WorkerTest { + + private static void runWorker(String workerId, int port) throws PulsarClientException, URISyntaxException, InterruptedException { + WorkerConfig workerConfig = new WorkerConfig(); + workerConfig.setWorkerPort(port); + workerConfig.setZookeeperUri(new URI("http://127.0.0.1:2181")); + workerConfig.setNumFunctionPackageReplicas(1); + workerConfig.setFunctionMetadataTopic("persistent://sample/standalone/ns1/fmt"); + workerConfig.setPulsarBrokerRootUrl("pulsar://localhost:6650"); + workerConfig.setWorkerId(workerId); + Worker worker = new Worker(workerConfig); + worker.start(); + } + + @Test + public void testWorkerServer() throws URISyntaxException, InterruptedException, PulsarClientException { + +// Thread worker1 = new Thread(new Runnable() { +// @Override +// public void run() { +// try { +// runWorker("worker-1", 8001); +// } catch (PulsarClientException | URISyntaxException | InterruptedException e) { +// e.printStackTrace(); +// } +// } +// }); +// +// Thread worker2 = new Thread(new Runnable() { +// @Override +// public void run() { +// try { +// runWorker("worker-2", 8002); +// } catch (PulsarClientException | URISyntaxException | InterruptedException e) { +// e.printStackTrace(); +// } +// } +// }); +// +// worker1.setName("worker-1"); +// worker2.setName("worker-2"); +// +// worker1.start(); +// worker2.start(); +// +// worker1.join(); +// worker2.join(); + + } +}