Skip to content

Commit

Permalink
Refactor serde classes into JavaInstanceRunnable (apache#115)
Browse files Browse the repository at this point in the history
  • Loading branch information
srkukarni authored and sijie committed Mar 4, 2018
1 parent 882d56f commit 8fcdd09
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,51 +64,30 @@ public class JavaInstance implements AutoCloseable {
Object.class
);

private static SerDe initializeSerDe(String serdeClassName, ClassLoader classLoader) {
if (null == serdeClassName) {
return null;
} else {
return Reflections.createInstance(
serdeClassName,
SerDe.class,
classLoader);
}
}

private ContextImpl context;
private RequestHandler requestHandler;
private RawRequestHandler rawRequestHandler;
private ExecutorService executorService;
private SerDe inputSerDe;
@Getter
private SerDe outputSerDe;

public JavaInstance(JavaInstanceConfig config) {
this(config, Thread.currentThread().getContextClassLoader());
}

public JavaInstance(JavaInstanceConfig config, ClassLoader clsLoader) {
public JavaInstance(JavaInstanceConfig config, ClassLoader clsLoader,
SerDe inputSerDe, SerDe outputSerDe) {
this(
config,
Reflections.createInstance(
config.getFunctionConfig().getClassName(),
clsLoader),
clsLoader);
clsLoader), inputSerDe, outputSerDe);
}

JavaInstance(JavaInstanceConfig config, Object object, ClassLoader clsLoader) {
JavaInstance(JavaInstanceConfig config, Object object, SerDe inputSerDe, SerDe outputSerDe) {
// TODO: cache logger instances by functions?
Logger instanceLog = LoggerFactory.getLogger("function-" + config.getFunctionConfig().getName());

this.context = new ContextImpl(config, instanceLog);

// create the serde
this.inputSerDe = initializeSerDe(config.getFunctionConfig().getInputSerdeClassName(), clsLoader);
this.outputSerDe = initializeSerDe(config.getFunctionConfig().getOutputSerdeClassName(), clsLoader);
// create the functions
if (object instanceof RequestHandler) {
requestHandler = (RequestHandler) object;
computeInputAndOutputTypesAndVerifySerDe();
computeInputAndOutputTypesAndVerifySerDe(inputSerDe, outputSerDe);
} else if (object instanceof RawRequestHandler) {
rawRequestHandler = (RawRequestHandler) object;
} else {
Expand All @@ -121,7 +100,7 @@ public JavaInstance(JavaInstanceConfig config, ClassLoader clsLoader) {
}
}

private void computeInputAndOutputTypesAndVerifySerDe() {
private void computeInputAndOutputTypesAndVerifySerDe(SerDe inputSerDe, SerDe outputSerDe) {
Class<?>[] typeArgs = TypeResolver.resolveRawArguments(RequestHandler.class, requestHandler.getClass());
verifySupportedType(typeArgs[0], false);
verifySupportedType(typeArgs[1], true);
Expand Down Expand Up @@ -151,14 +130,14 @@ private void verifySupportedType(Type type, boolean allowVoid) {
}
}

public JavaExecutionResult handleMessage(String messageId, String topicName, byte[] data) {
public JavaExecutionResult handleMessage(String messageId, String topicName, byte[] data, SerDe inputSerDe) {
context.setCurrentMessageContext(messageId, topicName);
JavaExecutionResult executionResult = new JavaExecutionResult();
if (executorService == null) {
return processMessage(executionResult, data);
return processMessage(executionResult, data, inputSerDe);
}
Future<?> future = executorService.submit(() -> {
return processMessage(executionResult, data);
return processMessage(executionResult, data, inputSerDe);
});
try {
future.get(context.getTimeBudgetInMs(), TimeUnit.MILLISECONDS);
Expand All @@ -177,7 +156,8 @@ public JavaExecutionResult handleMessage(String messageId, String topicName, byt
return executionResult;
}

private JavaExecutionResult processMessage(JavaExecutionResult executionResult, byte[] data) {
private JavaExecutionResult processMessage(JavaExecutionResult executionResult, byte[] data,
SerDe inputSerDe) {
if (requestHandler != null) {
try {
Object input = inputSerDe.deserialize(data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.pulsar.functions.runtime.instance;

import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.logging.log4j.ThreadContext;
import org.apache.pulsar.client.api.*;
Expand All @@ -30,6 +31,7 @@
import org.apache.pulsar.functions.runtime.serde.SerDe;
import org.apache.pulsar.functions.stats.FunctionStats;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.Reflections;

import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -58,6 +60,16 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
private Exception failureException;
private JavaInstance javaInstance;

private SerDe inputSerDe;
private SerDe outputSerDe;

@Getter
@Setter
private class InputMessage {
private Object object;
private String messageId;
}

// function stats
@Getter
private final FunctionStats stats;
Expand Down Expand Up @@ -97,7 +109,14 @@ public void run() {
loadJars();
// initialize the thread context
ThreadContext.put("function", FunctionConfigUtils.getFullyQualifiedName(javaInstanceConfig.getFunctionConfig()));
javaInstance = new JavaInstance(javaInstanceConfig);

ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();

// create the serde
this.inputSerDe = initializeSerDe(javaInstanceConfig.getFunctionConfig().getInputSerdeClassName(), clsLoader);
this.outputSerDe = initializeSerDe(javaInstanceConfig.getFunctionConfig().getOutputSerdeClassName(), clsLoader);

javaInstance = new JavaInstance(javaInstanceConfig, clsLoader, inputSerDe, outputSerDe);

while (true) {
JavaExecutionResult result;
Expand All @@ -118,9 +137,10 @@ public void run() {
result = javaInstance.handleMessage(
convertMessageIdToString(msg.getMessageId()),
javaInstanceConfig.getFunctionConfig().getSourceTopic(),
msg.getData());
msg.getData(),
inputSerDe);
log.debug("Got result: {}", result.getResult());
processResult(msg, result, processAt, javaInstance.getOutputSerDe());
processResult(msg, result, processAt, outputSerDe);
}

javaInstance.close();
Expand Down Expand Up @@ -274,4 +294,16 @@ private static void addSystemMetrics(String metricName, double value, InstanceCo
private static String convertMessageIdToString(MessageId messageId) {
return messageId.toByteArray().toString();
}

private static SerDe initializeSerDe(String serdeClassName, ClassLoader clsLoader) {
if (null == serdeClassName) {
return null;
} else {
return Reflections.createInstance(
serdeClassName,
SerDe.class,
clsLoader);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,10 @@ public void testLongRunningFunction() throws Exception {
JavaInstanceConfig config = createInstanceConfig();
config.getLimitsConfig().setMaxTimeMs(2000);
JavaInstance instance = new JavaInstance(
config, new LongRunningHandler(), Thread.currentThread().getContextClassLoader());
config, new LongRunningHandler(), Utf8StringSerDe.of(), Utf8StringSerDe.of());
String testString = "ABC123";
JavaExecutionResult result = instance.handleMessage("1", "random", serialize(testString));
JavaExecutionResult result = instance.handleMessage("1", "random", serialize(testString),
Utf8StringSerDe.of());

assertNull(result.getUserException());
assertNotNull(result.getTimeoutException());
Expand All @@ -120,9 +121,10 @@ public void testLambda() {
JavaInstance instance = new JavaInstance(
config,
(RequestHandler<String, String>) (input, context) -> input + "-lambda",
Thread.currentThread().getContextClassLoader());
Utf8StringSerDe.of(), Utf8StringSerDe.of());
String testString = "ABC123";
JavaExecutionResult result = instance.handleMessage("1", "random", serialize(testString));
JavaExecutionResult result = instance.handleMessage("1", "random", serialize(testString),
Utf8StringSerDe.of());
assertNotNull(result.getResult());
assertEquals(new String(testString + "-lambda"), result.getResult());
}
Expand All @@ -136,7 +138,7 @@ public void testUnsupportedClasses() {
JavaInstanceConfig config = createInstanceConfig();
try {
new JavaInstance(
config, new UnsupportedHandler(), Thread.currentThread().getContextClassLoader());
config, new UnsupportedHandler(), Utf8StringSerDe.of(), Utf8StringSerDe.of());
assertFalse(true);
} catch (RuntimeException ex) {
// Good
Expand All @@ -153,7 +155,7 @@ public void testVoidInputClasses() {
JavaInstanceConfig config = createInstanceConfig();
try {
new JavaInstance(
config, new VoidInputHandler(), Thread.currentThread().getContextClassLoader());
config, new VoidInputHandler(), Utf8StringSerDe.of(), null);
assertFalse(true);
} catch (RuntimeException ex) {
// Good
Expand All @@ -170,9 +172,10 @@ public void testVoidOutputClasses() {
JavaInstanceConfig config = createInstanceConfig();
config.getLimitsConfig().setMaxTimeMs(2000);
JavaInstance instance = new JavaInstance(
config, new VoidOutputHandler(), Thread.currentThread().getContextClassLoader());
config, new VoidOutputHandler(), Utf8StringSerDe.of(), Utf8StringSerDe.of());
String testString = "ABC123";
JavaExecutionResult result = instance.handleMessage("1", "r", serialize(testString));
JavaExecutionResult result = instance.handleMessage("1", "r", serialize(testString),
Utf8StringSerDe.of());
assertNull(result.getUserException());
assertNull(result.getTimeoutException());
assertNull(result.getResult());
Expand All @@ -192,7 +195,7 @@ public void testInconsistentInputType() {
new JavaInstance(
config,
(RequestHandler<String, String>) (input, context) -> input + "-lambda",
Thread.currentThread().getContextClassLoader());
JavaSerDe.of(), Utf8StringSerDe.of());
fail("Should fail constructing java instance if function type is inconsistent with serde type");
} catch (RuntimeException re) {
assertTrue(re.getMessage().startsWith("Inconsistent types found between function input type and input serde type:"));
Expand All @@ -213,7 +216,7 @@ public void testInconsistentOutputType() {
new JavaInstance(
config,
(RequestHandler<String, String>) (input, context) -> input + "-lambda",
Thread.currentThread().getContextClassLoader());
Utf8StringSerDe.of(), JavaSerDe.of());
fail("Should fail constructing java instance if function type is inconsistent with serde type");
} catch (RuntimeException re) {
assertTrue(re.getMessage().startsWith("Inconsistent types found between function output type and output serde type:"));
Expand Down

0 comments on commit 8fcdd09

Please sign in to comment.