From a548bc2e04890b3d478f4e25f0d1896ff5439ac3 Mon Sep 17 00:00:00 2001 From: Sijie Guo Date: Tue, 19 Dec 2017 18:38:23 -0800 Subject: [PATCH] Input and Output Serdes (#31) ### Motivation input type can be different output type. so we need two serdes for each function. ### Modifications - add inputSerde and outputSerde - verify both input and output serde to make sure their types are consistent with function types ### Result user run functions with input/output serdes. --- pulsar-functions/run-examples.sh | 8 +- .../apache/pulsar/admin/cli/CmdFunctions.java | 14 ++- .../pulsar/functions/fs/FunctionConfig.java | 6 +- .../container/ThreadFunctionContainer.java | 2 +- .../runtime/instance/JavaInstance.java | 87 ++++++++++++------- .../runtime/instance/JavaInstanceTest.java | 31 +++++-- 6 files changed, 103 insertions(+), 45 deletions(-) diff --git a/pulsar-functions/run-examples.sh b/pulsar-functions/run-examples.sh index a9cb2dced046b..7ff482d70f910 100755 --- a/pulsar-functions/run-examples.sh +++ b/pulsar-functions/run-examples.sh @@ -17,4 +17,10 @@ # under the License. # -bin/pulsar-functions functions run --function-config conf/example.yml --sink-topic persistent://sample/standalone/ns1/test_result --source-topic persistent://sample/standalone/ns1/test_src --serde-classname org.apache.pulsar.functions.runtime.serde.Utf8StringSerDe --function-classpath `pwd`/api-examples/target/pulsar-functions-api-examples.jar +bin/pulsar-functions functions run \ + --function-config conf/example.yml \ + --sink-topic persistent://sample/standalone/ns1/test_result \ + --source-topic persistent://sample/standalone/ns1/test_src \ + --input-serde-classname org.apache.pulsar.functions.runtime.serde.Utf8StringSerDe \ + --output-serde-classname org.apache.pulsar.functions.runtime.serde.Utf8StringSerDe \ + --function-classpath `pwd`/api-examples/target/pulsar-functions-api-examples.jar diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index a282cce6d46ef..994d308020183 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -51,8 +51,11 @@ abstract class FunctionsCommand extends CliCommand { @Parameter(names = "--sink-topic", description = "Output Topic Name\n") protected String sinkTopicName; - @Parameter(names = "--serde-classname", description = "SerDe\n") - protected String serDeClassName; + @Parameter(names = "--input-serde-classname", description = "Input SerDe\n") + protected String inputSerdeClassName; + + @Parameter(names = "--output-serde-classname", description = "Output SerDe\n") + protected String outputSerdeClassName; @Parameter(names = "--function-config", description = "Function Config\n") protected String fnConfigFile; @@ -77,8 +80,11 @@ void run() throws Exception { if (null != className) { functionConfig.setClassName(className); } - if (null != serDeClassName) { - functionConfig.setSerdeClassName(serDeClassName); + if (null != inputSerdeClassName) { + functionConfig.setInputSerdeClassName(inputSerdeClassName); + } + if (null != outputSerdeClassName) { + functionConfig.setOutputSerdeClassName(outputSerdeClassName); } if (null != jarFiles) { functionConfig.setJarFiles(jarFiles); diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/fs/FunctionConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/fs/FunctionConfig.java index b49c64479614a..e5f5c85105647 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/fs/FunctionConfig.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/fs/FunctionConfig.java @@ -46,8 +46,10 @@ public class FunctionConfig { private String name; // function class name private String className; - // serde class name - private String serdeClassName; + // input serde class name + private String inputSerdeClassName; + // output serde class name + private String outputSerdeClassName; // function jar name private List jarFiles; // source topic diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/container/ThreadFunctionContainer.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/container/ThreadFunctionContainer.java index e67e918ff36c9..a196c6568ec46 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/container/ThreadFunctionContainer.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/container/ThreadFunctionContainer.java @@ -83,7 +83,7 @@ public void run() { result = javaInstance.handleMessage(payload.messageId, payload.topicName, payload.msgData); ExecutionResult actualResult = ExecutionResult.fromJavaResult(result, - javaInstance.getSerDe()); + javaInstance.getOutputSerDe()); payload.result.complete(actualResult); } catch (InterruptedException ie) { log.info("Function thread {} is interrupted", ie); diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/instance/JavaInstance.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/instance/JavaInstance.java index 986cb047e0ffb..e10a86e5f74f0 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/instance/JavaInstance.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/instance/JavaInstance.java @@ -18,47 +18,67 @@ */ package org.apache.pulsar.functions.runtime.instance; +import com.google.common.collect.Sets; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import lombok.Getter; +import lombok.extern.slf4j.Slf4j; import net.jodah.typetools.TypeResolver; import org.apache.pulsar.functions.api.RawRequestHandler; import org.apache.pulsar.functions.api.RequestHandler; import org.apache.pulsar.functions.runtime.serde.SerDe; import org.apache.pulsar.functions.utils.Reflections; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.io.*; import java.lang.reflect.Type; -import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.concurrent.*; /** * This is the Java Instance. This is started by the spawner using the JavaInstanceClient * program if invoking via a process based invocation or using JavaInstance using a thread * based invocation. */ +@Slf4j public class JavaInstance { - private static final Logger log = LoggerFactory.getLogger(JavaInstance.class); - private static final List supportedInputTypes = Arrays.asList( - Integer.TYPE, - Double.TYPE, - Long.TYPE, - String.class, - Short.TYPE, - Byte.TYPE, - Float.TYPE, - Map.class, - List.class, - Object.class + + private static final Set supportedInputTypes = Sets.newHashSet( + Integer.TYPE, + Double.TYPE, + Long.TYPE, + String.class, + Short.TYPE, + Byte.TYPE, + Float.TYPE, + Map.class, + List.class, + Object.class ); + + private static SerDe initializeSerDe(String serdeClassName, ClassLoader classLoader) { + if (null == serdeClassName) { + return null; + } else { + return Reflections.createInstance( + serdeClassName, + SerDe.class, + classLoader); + } + } + private ContextImpl context; private RequestHandler requestHandler; private RawRequestHandler rawRequestHandler; private ExecutorService executorService; + private SerDe inputSerDe; @Getter - private SerDe serDe; + private SerDe outputSerDe; public JavaInstance(JavaInstanceConfig config) { this(config, Thread.currentThread().getContextClassLoader()); @@ -77,14 +97,9 @@ public JavaInstance(JavaInstanceConfig config, ClassLoader clsLoader) { this.context = new ContextImpl(config, log); // create the serde - if (null == config.getFunctionConfig().getSerdeClassName()) { - this.serDe = null; - } else { - this.serDe = Reflections.createInstance( - config.getFunctionConfig().getSerdeClassName(), - SerDe.class, - clsLoader); - } + this.inputSerDe = initializeSerDe(config.getFunctionConfig().getInputSerdeClassName(), clsLoader); + this.outputSerDe = initializeSerDe(config.getFunctionConfig().getOutputSerdeClassName(), clsLoader); + // create the functions if (object instanceof RequestHandler) { requestHandler = (RequestHandler) object; computeInputAndOutputTypesAndVerifySerDe(); @@ -102,12 +117,20 @@ private void computeInputAndOutputTypesAndVerifySerDe() { verifySupportedType(typeArgs[0], false); verifySupportedType(typeArgs[1], true); - Class[] serdeTypeArgs = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass()); - verifySupportedType(serdeTypeArgs[0], false); + Class[] inputSerdeTypeArgs = TypeResolver.resolveRawArguments(SerDe.class, inputSerDe.getClass()); + verifySupportedType(inputSerdeTypeArgs[0], false); + if (!typeArgs[0].equals(inputSerdeTypeArgs[0])) { + throw new RuntimeException("Inconsistent types found between function input type and input serde type: " + + " function type = " + typeArgs[0] + ", serde type = " + inputSerdeTypeArgs[0]); + } - if (!typeArgs[0].equals(serdeTypeArgs[0])) { - throw new RuntimeException("Inconsistent types found between function class and serde class: " - + " function type = " + typeArgs[0] + ", serde type = " + serdeTypeArgs[0]); + if (!Void.class.equals(typeArgs[1])) { // return type is not `Void.class` + Class[] outputSerdeTypeArgs = TypeResolver.resolveRawArguments(SerDe.class, outputSerDe.getClass()); + verifySupportedType(outputSerdeTypeArgs[0], false); + if (!typeArgs[1].equals(outputSerdeTypeArgs[0])) { + throw new RuntimeException("Inconsistent types found between function output type and output serde type: " + + " function type = " + typeArgs[1] + ", serde type = " + outputSerdeTypeArgs[0]); + } } } @@ -125,7 +148,7 @@ public JavaExecutionResult handleMessage(String messageId, String topicName, byt Future future = executorService.submit(() -> { if (requestHandler != null) { try { - Object input = serDe.deserialize(data); + Object input = inputSerDe.deserialize(data); Object output = requestHandler.handleRequest(input, context); executionResult.setResult(output); } catch (Exception ex) { diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/instance/JavaInstanceTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/instance/JavaInstanceTest.java index 9049b337b6a70..5b8aa126feafe 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/instance/JavaInstanceTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/instance/JavaInstanceTest.java @@ -82,7 +82,8 @@ public Void handleRequest(String input, Context context) throws Exception { private static JavaInstanceConfig createInstanceConfig() { FunctionConfig functionConfig = new FunctionConfig(); - functionConfig.setSerdeClassName(Utf8StringSerDe.class.getName()); + functionConfig.setInputSerdeClassName(Utf8StringSerDe.class.getName()); + functionConfig.setOutputSerdeClassName(Utf8StringSerDe.class.getName()); JavaInstanceConfig instanceConfig = new JavaInstanceConfig(); instanceConfig.setFunctionConfig(functionConfig); return instanceConfig; @@ -175,13 +176,13 @@ public void testVoidOutputClasses() { } /** - * Verify that function type should be consistent with serde type. + * Verify that function input type should be consistent with input serde type. */ @Test - public void testInconsistentType() { + public void testInconsistentInputType() { JavaInstanceConfig config = createInstanceConfig(); config.setTimeBudgetInMs(2000); - config.getFunctionConfig().setSerdeClassName(JavaSerDe.class.getName()); + config.getFunctionConfig().setInputSerdeClassName(JavaSerDe.class.getName()); try { new JavaInstance( @@ -190,7 +191,27 @@ public void testInconsistentType() { Thread.currentThread().getContextClassLoader()); fail("Should fail constructing java instance if function type is inconsistent with serde type"); } catch (RuntimeException re) { - assertTrue(re.getMessage().startsWith("Inconsistent types found between function class and serde class:")); + assertTrue(re.getMessage().startsWith("Inconsistent types found between function input type and input serde type:")); + } + } + + /** + * Verify that function output type should be consistent with output serde type. + */ + @Test + public void testInconsistentOutputType() { + JavaInstanceConfig config = createInstanceConfig(); + config.setTimeBudgetInMs(2000); + config.getFunctionConfig().setOutputSerdeClassName(JavaSerDe.class.getName()); + + try { + new JavaInstance( + config, + (RequestHandler) (input, context) -> input + "-lambda", + Thread.currentThread().getContextClassLoader()); + fail("Should fail constructing java instance if function type is inconsistent with serde type"); + } catch (RuntimeException re) { + assertTrue(re.getMessage().startsWith("Inconsistent types found between function output type and output serde type:")); } } }