Skip to content

Commit

Permalink
Instance (#3)
Browse files Browse the repository at this point in the history
* Create pulsar-functions module (#1)

* Create pulsar-functions module

* rename `sdk` package to `api`

* Added the first cut of the Java interface for Pulsar functions (#2)

* Added a simple Instance to run the exposed user interfaces

* Addressed pr comments plus added tests

* Remove unused import
  • Loading branch information
srkukarni authored and sijie committed Mar 4, 2018
1 parent 9a56774 commit a13d5ce
Show file tree
Hide file tree
Showing 8 changed files with 570 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

import org.slf4j.Logger;

interface Context {
public interface Context {
/**
* Returns the messageId of the message that we are processing
* This messageId is a stringified version of the actual MessageId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import java.io.InputStream;
import java.io.OutputStream;

interface RawRequestHandler {
public interface RawRequestHandler {
/**
* Process the input coming as an input stream. The process function
* can optionally output any bytes into the Outputstream output.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
*/
package org.apache.pulsar.functions.api;

interface RequestHandler<I, O> {
public interface RequestHandler<I, O> {
/**
* Process the input.
* @return the output
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* This class implements the Context interface exposed to the user.
*/
package org.apache.pulsar.functions.instance;

import org.apache.pulsar.functions.api.Context;
import org.slf4j.Logger;

class ContextImpl implements Context {
private JavaInstanceConfig config;
private Logger logger;

// Per Message related
private String messageId;
private String topicName;
private long startTime;

public ContextImpl(JavaInstanceConfig config, Logger logger) {
this.config = config;
this.logger = logger;
}

public void setCurrentMessageContext(String messageId, String topicName) {
this.messageId = messageId;
this.topicName = topicName;
this.startTime = System.currentTimeMillis();
}

@Override
public String getMessageId() {
return messageId;
}

@Override
public String getTopicName() {
return topicName;
}

@Override
public String getFunctionName() {
return config.getFunctionName();
}

@Override
public String getFunctionId() {
return config.getFunctionId();
}

@Override
public String getFunctionVersion() {
return config.getFunctionVersion();
}

@Override
public long getMemoryLimit() {
return config.getMaxMemory();
}

@Override
public long getTimeBudgetInMs() {
return config.getTimeBudgetInMs();
}

@Override
public long getRemainingTimeInMs() {
return getTimeBudgetInMs() - (System.currentTimeMillis() - startTime);
}

@Override
public Logger getLogger() {
return logger;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* This is the Java Instance. This is started by the spawner using the JavaInstanceClient
* program if invoking via a process based invocation or using JavaInstance using a thread
* based invocation.
*/
package org.apache.pulsar.functions.instance;

import org.apache.pulsar.functions.api.RawRequestHandler;
import org.apache.pulsar.functions.api.RequestHandler;
import org.slf4j.Logger;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;

class JavaInstance {
enum SupportedTypes {
INTEGER,
STRING,
LONG,
DOUBLE,
BYTE,
SHORT,
FLOAT,
MAP,
LIST
}
private ContextImpl context;
private Logger logger;
private SupportedTypes inputType;
private SupportedTypes outputType;
private RequestHandler requestHandler;
private RawRequestHandler rawRequestHandler;
private ExecutorService executorService;
private ExecutionResult executionResult;

class ExecutionResult {
private Exception userException;
private TimeoutException timeoutException;
private Object resultValue;
private ByteArrayOutputStream outputStream;

public Exception getUserException() {
return userException;
}

public void setUserException(Exception userException) {
this.userException = userException;
}

public TimeoutException getTimeoutException() {
return timeoutException;
}

public void setTimeoutException(TimeoutException timeoutException) {
this.timeoutException = timeoutException;
}

public Object getResultValue() {
return resultValue;
}

public void setResultValue(Object resultValue) {
this.resultValue = resultValue;
}

public ByteArrayOutputStream getOutputStream() {
return outputStream;
}

public void setOutputStream(ByteArrayOutputStream outputStream) {
this.outputStream = outputStream;
}

public void reset() {
this.setUserException(null);
this.setTimeoutException(null);
this.setResultValue(null);
this.setOutputStream(null);
}
}

public static Object createObject(String userClassName) {
Object object;
try {
Class<?> clazz = Class.forName(userClassName);
object = clazz.newInstance();
} catch (ClassNotFoundException ex) {
throw new RuntimeException(ex + " User class must be in class path.");
} catch (InstantiationException ex) {
throw new RuntimeException(ex + " User class must be concrete.");
} catch (IllegalAccessException ex) {
throw new RuntimeException(ex + " User class must have a no-arg constructor.");
}
return object;
}

public JavaInstance(JavaInstanceConfig config, String userClassName, Logger logger) {
this(config, createObject(userClassName), logger);
}

public JavaInstance(JavaInstanceConfig config, Object object, Logger logger) {
this.context = new ContextImpl(config, logger);
this.logger = logger;
if (object instanceof RequestHandler) {
requestHandler = (RequestHandler) object;
computeInputAndOutputTypes();
} else if (object instanceof RawRequestHandler) {
rawRequestHandler = (RawRequestHandler) object;
} else {
throw new RuntimeException("User class must be either a Request or Raw Request Handler");
}

executorService = Executors.newFixedThreadPool(1);
this.executionResult = new ExecutionResult();
}

private void computeInputAndOutputTypes() {
try {
Method[] allMethods = requestHandler.getClass().getDeclaredMethods();
for (Method m : allMethods) {
if (!m.getName().equals("handleRequest")) {
continue;
}
inputType = computeSupportedType(m.getGenericParameterTypes()[0]);
outputType = computeSupportedType(m.getGenericReturnType());
return;
}
throw new RuntimeException("Strange that RequestHandler object does not have handleRequest method");
} catch (ArrayIndexOutOfBoundsException ex) {
throw new RuntimeException(ex + " Strange that requestHandle method does not take any arguments");
}
}

private SupportedTypes computeSupportedType(Type type) {
if (type.equals(Integer.TYPE)) {
return SupportedTypes.INTEGER;
} else if (type.equals(Double.TYPE)) {
return SupportedTypes.DOUBLE;
} else if (type.equals(Long.TYPE)) {
return SupportedTypes.LONG;
} else if (type.equals(String.class)) {
return SupportedTypes.STRING;
} else if (type.equals(Short.TYPE)) {
return SupportedTypes.SHORT;
} else if (type.equals(Byte.TYPE)) {
return SupportedTypes.BYTE;
} else if (type.equals(Float.TYPE)) {
return SupportedTypes.FLOAT;
} else if (type.equals(Map.class)) {
return SupportedTypes.MAP;
} else if (type.equals(List.class)) {
return SupportedTypes.LIST;
} else {
throw new RuntimeException("Non Basic types not yet supported");
}
}

public ExecutionResult handleMessage(String messageId, String topicName, byte[] data) {
context.setCurrentMessageContext(messageId, topicName);
executionResult.reset();
Future<?> future = executorService.submit(new Runnable() {
@Override
public void run() {
if (requestHandler != null) {
try {
Object obj = deserialize(data);
executionResult.setResultValue(requestHandler.handleRequest(obj, context));
} catch (Exception ex) {
executionResult.setUserException(ex);
}
} else if (rawRequestHandler != null) {
try {
ByteArrayInputStream inputStream = new ByteArrayInputStream(data);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
rawRequestHandler.handleRequest(inputStream, outputStream, context);
executionResult.setOutputStream(outputStream);
} catch (Exception ex) {
executionResult.setUserException(ex);
}
}
}
});
try {
future.get(context.getTimeBudgetInMs(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
logger.error("handleMessage was interrupted");
executionResult.setUserException(e);
} catch (ExecutionException e) {
logger.error("handleMessage threw exception: " + e.getCause());
executionResult.setUserException(e);
} catch (TimeoutException e) {
future.cancel(true); // <-- interrupt the job
logger.error("handleMessage timed out");
executionResult.setTimeoutException(e);
}

return executionResult;
}

private Object deserialize(byte[] data) throws Exception {
switch (inputType) {
case INTEGER: {
return ByteBuffer.wrap(data).getInt();
}
case LONG: {
return ByteBuffer.wrap(data).getLong();
}
case DOUBLE: {
return ByteBuffer.wrap(data).getDouble();
}
case FLOAT: {
return ByteBuffer.wrap(data).getFloat();
}
case SHORT: {
return ByteBuffer.wrap(data).getShort();
}
case BYTE: {
return ByteBuffer.wrap(data).get();
}
case STRING: {
return new String(data);
}
case MAP:
case LIST: {
ByteArrayInputStream byteIn = new ByteArrayInputStream(data);
ObjectInputStream in = new ObjectInputStream(byteIn);
return in.readObject();
}
default: {
throw new RuntimeException("Unknown SupportedType " + inputType);
}
}
}
}
Loading

0 comments on commit a13d5ce

Please sign in to comment.