Skip to content

Commit

Permalink
Input and Output Serdes (apache#31)
Browse files Browse the repository at this point in the history
### Motivation

input type can be different output type. so we need two serdes for each function.

### Modifications

- add inputSerde and outputSerde
- verify both input and output serde to make sure their types are consistent with function types

### Result

user run functions with input/output serdes.
  • Loading branch information
sijie committed Mar 4, 2018
1 parent 275daf0 commit a548bc2
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 45 deletions.
8 changes: 7 additions & 1 deletion pulsar-functions/run-examples.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,10 @@
# under the License.
#

bin/pulsar-functions functions run --function-config conf/example.yml --sink-topic persistent://sample/standalone/ns1/test_result --source-topic persistent://sample/standalone/ns1/test_src --serde-classname org.apache.pulsar.functions.runtime.serde.Utf8StringSerDe --function-classpath `pwd`/api-examples/target/pulsar-functions-api-examples.jar
bin/pulsar-functions functions run \
--function-config conf/example.yml \
--sink-topic persistent://sample/standalone/ns1/test_result \
--source-topic persistent://sample/standalone/ns1/test_src \
--input-serde-classname org.apache.pulsar.functions.runtime.serde.Utf8StringSerDe \
--output-serde-classname org.apache.pulsar.functions.runtime.serde.Utf8StringSerDe \
--function-classpath `pwd`/api-examples/target/pulsar-functions-api-examples.jar
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,11 @@ abstract class FunctionsCommand extends CliCommand {
@Parameter(names = "--sink-topic", description = "Output Topic Name\n")
protected String sinkTopicName;

@Parameter(names = "--serde-classname", description = "SerDe\n")
protected String serDeClassName;
@Parameter(names = "--input-serde-classname", description = "Input SerDe\n")
protected String inputSerdeClassName;

@Parameter(names = "--output-serde-classname", description = "Output SerDe\n")
protected String outputSerdeClassName;

@Parameter(names = "--function-config", description = "Function Config\n")
protected String fnConfigFile;
Expand All @@ -77,8 +80,11 @@ void run() throws Exception {
if (null != className) {
functionConfig.setClassName(className);
}
if (null != serDeClassName) {
functionConfig.setSerdeClassName(serDeClassName);
if (null != inputSerdeClassName) {
functionConfig.setInputSerdeClassName(inputSerdeClassName);
}
if (null != outputSerdeClassName) {
functionConfig.setOutputSerdeClassName(outputSerdeClassName);
}
if (null != jarFiles) {
functionConfig.setJarFiles(jarFiles);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ public class FunctionConfig {
private String name;
// function class name
private String className;
// serde class name
private String serdeClassName;
// input serde class name
private String inputSerdeClassName;
// output serde class name
private String outputSerdeClassName;
// function jar name
private List<String> jarFiles;
// source topic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void run() {
result = javaInstance.handleMessage(payload.messageId,
payload.topicName, payload.msgData);
ExecutionResult actualResult = ExecutionResult.fromJavaResult(result,
javaInstance.getSerDe());
javaInstance.getOutputSerDe());
payload.result.complete(actualResult);
} catch (InterruptedException ie) {
log.info("Function thread {} is interrupted", ie);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,47 +18,67 @@
*/
package org.apache.pulsar.functions.runtime.instance;

import com.google.common.collect.Sets;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import net.jodah.typetools.TypeResolver;
import org.apache.pulsar.functions.api.RawRequestHandler;
import org.apache.pulsar.functions.api.RequestHandler;
import org.apache.pulsar.functions.runtime.serde.SerDe;
import org.apache.pulsar.functions.utils.Reflections;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;

/**
* This is the Java Instance. This is started by the spawner using the JavaInstanceClient
* program if invoking via a process based invocation or using JavaInstance using a thread
* based invocation.
*/
@Slf4j
public class JavaInstance {
private static final Logger log = LoggerFactory.getLogger(JavaInstance.class);
private static final List<Type> supportedInputTypes = Arrays.asList(
Integer.TYPE,
Double.TYPE,
Long.TYPE,
String.class,
Short.TYPE,
Byte.TYPE,
Float.TYPE,
Map.class,
List.class,
Object.class

private static final Set<Type> supportedInputTypes = Sets.newHashSet(
Integer.TYPE,
Double.TYPE,
Long.TYPE,
String.class,
Short.TYPE,
Byte.TYPE,
Float.TYPE,
Map.class,
List.class,
Object.class
);

private static SerDe initializeSerDe(String serdeClassName, ClassLoader classLoader) {
if (null == serdeClassName) {
return null;
} else {
return Reflections.createInstance(
serdeClassName,
SerDe.class,
classLoader);
}
}

private ContextImpl context;
private RequestHandler requestHandler;
private RawRequestHandler rawRequestHandler;
private ExecutorService executorService;
private SerDe inputSerDe;
@Getter
private SerDe serDe;
private SerDe outputSerDe;

public JavaInstance(JavaInstanceConfig config) {
this(config, Thread.currentThread().getContextClassLoader());
Expand All @@ -77,14 +97,9 @@ public JavaInstance(JavaInstanceConfig config, ClassLoader clsLoader) {
this.context = new ContextImpl(config, log);

// create the serde
if (null == config.getFunctionConfig().getSerdeClassName()) {
this.serDe = null;
} else {
this.serDe = Reflections.createInstance(
config.getFunctionConfig().getSerdeClassName(),
SerDe.class,
clsLoader);
}
this.inputSerDe = initializeSerDe(config.getFunctionConfig().getInputSerdeClassName(), clsLoader);
this.outputSerDe = initializeSerDe(config.getFunctionConfig().getOutputSerdeClassName(), clsLoader);
// create the functions
if (object instanceof RequestHandler) {
requestHandler = (RequestHandler) object;
computeInputAndOutputTypesAndVerifySerDe();
Expand All @@ -102,12 +117,20 @@ private void computeInputAndOutputTypesAndVerifySerDe() {
verifySupportedType(typeArgs[0], false);
verifySupportedType(typeArgs[1], true);

Class<?>[] serdeTypeArgs = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass());
verifySupportedType(serdeTypeArgs[0], false);
Class<?>[] inputSerdeTypeArgs = TypeResolver.resolveRawArguments(SerDe.class, inputSerDe.getClass());
verifySupportedType(inputSerdeTypeArgs[0], false);
if (!typeArgs[0].equals(inputSerdeTypeArgs[0])) {
throw new RuntimeException("Inconsistent types found between function input type and input serde type: "
+ " function type = " + typeArgs[0] + ", serde type = " + inputSerdeTypeArgs[0]);
}

if (!typeArgs[0].equals(serdeTypeArgs[0])) {
throw new RuntimeException("Inconsistent types found between function class and serde class: "
+ " function type = " + typeArgs[0] + ", serde type = " + serdeTypeArgs[0]);
if (!Void.class.equals(typeArgs[1])) { // return type is not `Void.class`
Class<?>[] outputSerdeTypeArgs = TypeResolver.resolveRawArguments(SerDe.class, outputSerDe.getClass());
verifySupportedType(outputSerdeTypeArgs[0], false);
if (!typeArgs[1].equals(outputSerdeTypeArgs[0])) {
throw new RuntimeException("Inconsistent types found between function output type and output serde type: "
+ " function type = " + typeArgs[1] + ", serde type = " + outputSerdeTypeArgs[0]);
}
}
}

Expand All @@ -125,7 +148,7 @@ public JavaExecutionResult handleMessage(String messageId, String topicName, byt
Future<?> future = executorService.submit(() -> {
if (requestHandler != null) {
try {
Object input = serDe.deserialize(data);
Object input = inputSerDe.deserialize(data);
Object output = requestHandler.handleRequest(input, context);
executionResult.setResult(output);
} catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ public Void handleRequest(String input, Context context) throws Exception {

private static JavaInstanceConfig createInstanceConfig() {
FunctionConfig functionConfig = new FunctionConfig();
functionConfig.setSerdeClassName(Utf8StringSerDe.class.getName());
functionConfig.setInputSerdeClassName(Utf8StringSerDe.class.getName());
functionConfig.setOutputSerdeClassName(Utf8StringSerDe.class.getName());
JavaInstanceConfig instanceConfig = new JavaInstanceConfig();
instanceConfig.setFunctionConfig(functionConfig);
return instanceConfig;
Expand Down Expand Up @@ -175,13 +176,13 @@ public void testVoidOutputClasses() {
}

/**
* Verify that function type should be consistent with serde type.
* Verify that function input type should be consistent with input serde type.
*/
@Test
public void testInconsistentType() {
public void testInconsistentInputType() {
JavaInstanceConfig config = createInstanceConfig();
config.setTimeBudgetInMs(2000);
config.getFunctionConfig().setSerdeClassName(JavaSerDe.class.getName());
config.getFunctionConfig().setInputSerdeClassName(JavaSerDe.class.getName());

try {
new JavaInstance(
Expand All @@ -190,7 +191,27 @@ public void testInconsistentType() {
Thread.currentThread().getContextClassLoader());
fail("Should fail constructing java instance if function type is inconsistent with serde type");
} catch (RuntimeException re) {
assertTrue(re.getMessage().startsWith("Inconsistent types found between function class and serde class:"));
assertTrue(re.getMessage().startsWith("Inconsistent types found between function input type and input serde type:"));
}
}

/**
* Verify that function output type should be consistent with output serde type.
*/
@Test
public void testInconsistentOutputType() {
JavaInstanceConfig config = createInstanceConfig();
config.setTimeBudgetInMs(2000);
config.getFunctionConfig().setOutputSerdeClassName(JavaSerDe.class.getName());

try {
new JavaInstance(
config,
(RequestHandler<String, String>) (input, context) -> input + "-lambda",
Thread.currentThread().getContextClassLoader());
fail("Should fail constructing java instance if function type is inconsistent with serde type");
} catch (RuntimeException re) {
assertTrue(re.getMessage().startsWith("Inconsistent types found between function output type and output serde type:"));
}
}
}

0 comments on commit a548bc2

Please sign in to comment.