Skip to content

Commit

Permalink
Use Spawner inside Java Instance rather than using JavaInstance direc…
Browse files Browse the repository at this point in the history
…tly (apache#169)
  • Loading branch information
srkukarni authored and sijie committed Mar 4, 2018
1 parent 2f6167e commit efce272
Showing 1 changed file with 37 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,15 @@
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.functions.fs.LimitsConfig;
import org.apache.pulsar.functions.proto.Function.FunctionConfig;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.runtime.container.InstanceConfig;
import org.apache.pulsar.functions.runtime.functioncache.FunctionCacheManagerImpl;
import org.apache.pulsar.functions.runtime.container.ThreadFunctionContainerFactory;
import org.apache.pulsar.functions.proto.InstanceControlGrpc;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.runtime.spawner.Spawner;

import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.Map;

/**
Expand Down Expand Up @@ -100,9 +99,6 @@ public class JavaInstanceMain {
@Parameter(names = "--auto_ack", description = "Enable Auto Acking?\n")
protected String autoAck = "true";

private Thread fnThread;
private JavaInstanceRunnable javaInstanceRunnable;

private Server server;

public JavaInstanceMain() { }
Expand Down Expand Up @@ -143,46 +139,48 @@ public void start() throws Exception {
Map<String, String> userConfigMap = new Gson().fromJson(userConfig, type);
functionConfigBuilder.putAllUserConfig(userConfigMap);
}
instanceConfig.setFunctionConfig(functionConfigBuilder.build());
FunctionConfig functionConfig = functionConfigBuilder.build();
instanceConfig.setFunctionConfig(functionConfig);

LimitsConfig limitsConfig = new LimitsConfig();
limitsConfig.setMaxBufferedTuples(maxBufferedTuples);

log.info("Starting JavaInstanceMain with {}", instanceConfig);
this.javaInstanceRunnable = new JavaInstanceRunnable(
instanceConfig,
ThreadFunctionContainerFactory containerFactory = new ThreadFunctionContainerFactory(
"LocalRunnerThreadGroup",
maxBufferedTuples,
new FunctionCacheManagerImpl(),
jarFile,
PulsarClient.create(pulsarServiceUrl, new ClientConfiguration()),
pulsarServiceUrl,
stateStorageServiceUrl);
this.fnThread = new Thread(javaInstanceRunnable, FunctionConfigUtils.getFullyQualifiedName(instanceConfig.getFunctionConfig()));

Spawner spawner = Spawner.createSpawner(
functionConfig,
limitsConfig,
jarFile,
containerFactory,
null,
0);

server = ServerBuilder.forPort(port)
.addService(new InstanceControlImpl(javaInstanceRunnable))
.addService(new InstanceControlImpl(spawner))
.build()
.start();
log.info("JaveInstance Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
try {
JavaInstanceMain.this.stop();
server.shutdown();
spawner.close();
} catch (Exception ex) {
System.err.println(ex);
}
System.err.println("*** server shut down");
}
});
fnThread.start();
server.awaitTermination();
}

private void stop() throws Exception {
if (server != null) {
server.shutdown();
}
fnThread.interrupt();
fnThread.join();
log.info("Starting spawner");
spawner.start();
spawner.join();
log.info("Spawner quit, shutting down JavaInstance");
server.shutdown();
}

public static void main(String[] args) throws Exception {
Expand All @@ -196,23 +194,21 @@ public static void main(String[] args) throws Exception {
}

static class InstanceControlImpl extends InstanceControlGrpc.InstanceControlImplBase {
private JavaInstanceRunnable javaInstanceRunnable;
private Spawner spawner;

public InstanceControlImpl(JavaInstanceRunnable javaInstanceRunnable) {
this.javaInstanceRunnable = javaInstanceRunnable;
public InstanceControlImpl(Spawner spawner) {
this.spawner = spawner;
}

@Override
public void getFunctionStatus(Empty request, StreamObserver<InstanceCommunication.FunctionStatus> responseObserver) {
InstanceCommunication.FunctionStatus.Builder builder = javaInstanceRunnable.getFunctionStatus();
String failureException = javaInstanceRunnable.getFailureException() != null
? javaInstanceRunnable.getFailureException().getMessage() : "";
InstanceCommunication.FunctionStatus response = builder
.setRunning(true)
.setFailureException(failureException)
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
try {
InstanceCommunication.FunctionStatus response = spawner.getFunctionStatus().get();
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}

0 comments on commit efce272

Please sign in to comment.