Skip to content

Commit

Permalink
Initial implementation of Pulsar Function Worker (apache#47)
Browse files Browse the repository at this point in the history
  • Loading branch information
jerrypeng authored and sijie committed Mar 4, 2018
1 parent 5c4b286 commit 570d59c
Show file tree
Hide file tree
Showing 34 changed files with 2,345 additions and 26 deletions.
2 changes: 1 addition & 1 deletion pulsar-functions/bin/pulsar-functions
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ BINDIR=$(dirname "$0")
PULSAR_HOME=`cd $BINDIR/..;pwd`

DEFAULT_CLIENT_CONF=$PULSAR_HOME/conf/client.conf
DEFAULT_LOG_CONF=$PULSAR_HOME/conf/log4j.properties
DEFAULT_LOG_CONF=$PULSAR_HOME/conf/log4j.shell.properties

if [ -f "$PULSAR_HOME/conf/pulsar_tools_env.sh" ]
then
Expand Down
2 changes: 1 addition & 1 deletion pulsar-functions/conf/client.conf
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#

# Pulsar Client configuration
webServiceUrl=http://localhost:8080/
webServiceUrl=http://localhost:8001/
brokerServiceUrl=pulsar://localhost:6650/
#authPlugin=
#authParams=
Expand Down
2 changes: 1 addition & 1 deletion pulsar-functions/conf/example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@
#

tenant: "test"
nameSpace: "test"
namespace: "test-namespace"
name: "example"
className: "org.apache.pulsar.functions.api.examples.ExclamationFunction"
2 changes: 1 addition & 1 deletion pulsar-functions/conf/log4j.shell.properties
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

# DEFAULT: console appender only
# Define some default values that can be overridden by system properties
bookkeeper.root.logger=ERROR,CONSOLE
bookkeeper.root.logger=INFO,CONSOLE

log4j.rootLogger=${bookkeeper.root.logger}

Expand Down
2 changes: 1 addition & 1 deletion pulsar-functions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
<parent>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void prepare() {
fnConfig.setSinkTopic("test-sink");
fnConfig.setSourceTopic("test-source");
fnConfig.setTenant("test-tenant");
fnConfig.setNameSpace("test-namespace");
fnConfig.setNamespace("test-namespace");
fnConfig.setVersion(UUID.randomUUID().toString());

JavaInstanceConfig config =
Expand Down
75 changes: 68 additions & 7 deletions pulsar-functions/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<properties>
<jetty.version>9.4.8.v20171121</jetty.version>
<jersery.version>2.26</jersery.version>
<dlog.version>0.5.0</dlog.version>
<gson.version>2.8.2</gson.version>
<jackson.version>2.9.2</jackson.version>
</properties>

<parent>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-functions</artifactId>
Expand Down Expand Up @@ -58,19 +67,19 @@
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
<version>2.9.2</version>
<version>${jackson.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.9.2</version>
<version>${jackson.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.2</version>
<version>${jackson.version}</version>
</dependency>

<dependency>
Expand All @@ -86,20 +95,72 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.glassfish.jersey.media</groupId>
<artifactId>jersey-media-json-jackson</artifactId>
<version>${jersery.version}</version>
</dependency>

<dependency>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-client</artifactId>
<artifactId>jersey-server</artifactId>
<version>${jersery.version}</version>
</dependency>

<dependency>
<groupId>org.glassfish.jersey.media</groupId>
<artifactId>jersey-media-json-jackson</artifactId>
<groupId>org.glassfish.jersey.containers</groupId>
<artifactId>jersey-container-servlet</artifactId>
<version>${jersery.version}</version>
</dependency>

<dependency>
<groupId>org.glassfish.jersey.containers</groupId>
<artifactId>jersey-container-servlet-core</artifactId>
<version>${jersery.version}</version>
</dependency>

<dependency>
<groupId>org.glassfish.jersey.inject</groupId>
<artifactId>jersey-hk2</artifactId>
<version>${jersery.version}</version>
</dependency>

<dependency>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-client</artifactId>
<version>${jersery.version}</version>
</dependency>

<dependency>
<groupId>org.glassfish.jersey.media</groupId>
<artifactId>jersey-media-multipart</artifactId>
<version>2.23</version>
<version>${jersery.version}</version>
</dependency>

<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>${jetty.version}</version>
</dependency>

<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<version>${jetty.version}</version>
</dependency>

<dependency>
<groupId>org.apache.distributedlog</groupId>
<artifactId>distributedlog-core</artifactId>
<version>${dlog.version}</version>
<type>jar</type>
<classifier>shaded</classifier>
</dependency>

<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>${gson.version}</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class GetFunction extends FunctionsCommand {
@Override
void run_functions_cmd() throws Exception {
PulsarFunctionsAdmin a = (PulsarFunctionsAdmin)admin;
print(a.functions().getFunction(functionConfig.getTenant(), functionConfig.getNameSpace(), functionConfig.getName()));
print(a.functions().getFunction(functionConfig.getTenant(), functionConfig.getNamespace(), functionConfig.getName()));
}
}

Expand All @@ -142,7 +142,7 @@ class DeleteFunction extends FunctionsCommand {
@Override
void run_functions_cmd() throws Exception {
PulsarFunctionsAdmin a = (PulsarFunctionsAdmin)admin;
a.functions().deleteFunction(functionConfig.getTenant(), functionConfig.getNameSpace(), functionConfig.getName());
a.functions().deleteFunction(functionConfig.getTenant(), functionConfig.getNamespace(), functionConfig.getName());
print("Deleted successfully");
}
}
Expand All @@ -162,7 +162,7 @@ class ListFunctions extends FunctionsCommand {
@Override
void run_functions_cmd() throws Exception {
PulsarFunctionsAdmin a = (PulsarFunctionsAdmin)admin;
print(a.functions().getFunctions(functionConfig.getTenant(), functionConfig.getNameSpace()));
print(a.functions().getFunctions(functionConfig.getTenant(), functionConfig.getNamespace()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@
import org.apache.pulsar.functions.fs.FunctionConfig;
import org.glassfish.jersey.media.multipart.FormDataBodyPart;
import org.glassfish.jersey.media.multipart.FormDataMultiPart;
import org.glassfish.jersey.media.multipart.file.FileDataBodyPart;

import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.MediaType;
import java.io.FileInputStream;
import java.io.File;
import java.util.List;

public class FunctionsImpl extends BaseResource implements Functions {
Expand Down Expand Up @@ -65,7 +66,9 @@ public FunctionConfig getFunction(String tenant, String namespace, String functi
public void createFunction(FunctionConfig functionConfig, String fileName) throws PulsarAdminException {
try {
final FormDataMultiPart mp = new FormDataMultiPart();
mp.bodyPart(new FormDataBodyPart("data", new FileInputStream(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE));

mp.bodyPart(new FileDataBodyPart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE));

mp.bodyPart(new FormDataBodyPart("sourceTopic", functionConfig.getSourceTopic(),
MediaType.APPLICATION_JSON_TYPE));
mp.bodyPart(new FormDataBodyPart("sinkTopic", functionConfig.getSinkTopic(),
Expand All @@ -76,7 +79,7 @@ public void createFunction(FunctionConfig functionConfig, String fileName) throw
MediaType.APPLICATION_JSON_TYPE));
mp.bodyPart(new FormDataBodyPart("className", functionConfig.getClassName(),
MediaType.APPLICATION_JSON_TYPE));
request(functions.path(functionConfig.getTenant()).path(functionConfig.getNameSpace()).path(functionConfig.getName()))
request(functions.path(functionConfig.getTenant()).path(functionConfig.getNamespace()).path(functionConfig.getName()))
.post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
Expand All @@ -98,7 +101,7 @@ public void updateFunction(FunctionConfig functionConfig, String fileName) throw
try {
final FormDataMultiPart mp = new FormDataMultiPart();
if (fileName != null) {
mp.bodyPart(new FormDataBodyPart("data", new FileInputStream(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE));
mp.bodyPart(new FileDataBodyPart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE));
}
if (functionConfig.getSourceTopic() != null) {
mp.bodyPart(new FormDataBodyPart("sourceTopic", functionConfig.getSourceTopic(),
Expand All @@ -120,7 +123,7 @@ public void updateFunction(FunctionConfig functionConfig, String fileName) throw
mp.bodyPart(new FormDataBodyPart("className", functionConfig.getClassName(),
MediaType.APPLICATION_JSON_TYPE));
}
request(functions.path(functionConfig.getTenant()).path(functionConfig.getNameSpace()).path(functionConfig.getName()))
request(functions.path(functionConfig.getTenant()).path(functionConfig.getNamespace()).path(functionConfig.getName()))
.put(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class FunctionConfig {
// tenant that the function resides in
private String tenant;
// namespace that the function belongs to
private String nameSpace;
private String namespace;
// function name
private String name;
// Function version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* This represents the config related to the resource limits of function calls
*/
public class LimitsConfig {
private int timeBudgetInMs;
private int maxMemory;
private int maxTimeMs;
private int maxMemoryMb;
private int maxBufferedTuples;
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ private JavaInstanceConfig createJavaInstanceConfig() {
javaInstanceConfig.setFunctionConfig(assignmentInfo.getFunctionConfig());
javaInstanceConfig.setFunctionId(assignmentInfo.getFunctionId());
javaInstanceConfig.setFunctionVersion(assignmentInfo.getFunctionVersion());
javaInstanceConfig.setTimeBudgetInMs(limitsConfig.getTimeBudgetInMs());
javaInstanceConfig.setMaxMemory(limitsConfig.getMaxMemory());
javaInstanceConfig.setTimeBudgetInMs(limitsConfig.getMaxTimeMs());
javaInstanceConfig.setMaxMemory(limitsConfig.getMaxMemoryMb());
return javaInstanceConfig;
}
}
Loading

0 comments on commit 570d59c

Please sign in to comment.