diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/instance/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/instance/JavaInstanceMain.java index 02029148e911b..fc127cf2d5ee4 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/instance/JavaInstanceMain.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/instance/JavaInstanceMain.java @@ -29,16 +29,15 @@ import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.client.api.*; +import org.apache.pulsar.functions.fs.LimitsConfig; import org.apache.pulsar.functions.proto.Function.FunctionConfig; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.runtime.container.InstanceConfig; -import org.apache.pulsar.functions.runtime.functioncache.FunctionCacheManagerImpl; +import org.apache.pulsar.functions.runtime.container.ThreadFunctionContainerFactory; import org.apache.pulsar.functions.proto.InstanceControlGrpc; -import org.apache.pulsar.functions.utils.FunctionConfigUtils; +import org.apache.pulsar.functions.runtime.spawner.Spawner; import java.lang.reflect.Type; -import java.util.HashMap; import java.util.Map; /** @@ -100,9 +99,6 @@ public class JavaInstanceMain { @Parameter(names = "--auto_ack", description = "Enable Auto Acking?\n") protected String autoAck = "true"; - private Thread fnThread; - private JavaInstanceRunnable javaInstanceRunnable; - private Server server; public JavaInstanceMain() { } @@ -143,20 +139,28 @@ public void start() throws Exception { Map userConfigMap = new Gson().fromJson(userConfig, type); functionConfigBuilder.putAllUserConfig(userConfigMap); } - instanceConfig.setFunctionConfig(functionConfigBuilder.build()); + FunctionConfig functionConfig = functionConfigBuilder.build(); + instanceConfig.setFunctionConfig(functionConfig); + + LimitsConfig limitsConfig = new LimitsConfig(); + limitsConfig.setMaxBufferedTuples(maxBufferedTuples); - log.info("Starting JavaInstanceMain with {}", instanceConfig); - this.javaInstanceRunnable = new JavaInstanceRunnable( - instanceConfig, + ThreadFunctionContainerFactory containerFactory = new ThreadFunctionContainerFactory( + "LocalRunnerThreadGroup", maxBufferedTuples, - new FunctionCacheManagerImpl(), - jarFile, - PulsarClient.create(pulsarServiceUrl, new ClientConfiguration()), + pulsarServiceUrl, stateStorageServiceUrl); - this.fnThread = new Thread(javaInstanceRunnable, FunctionConfigUtils.getFullyQualifiedName(instanceConfig.getFunctionConfig())); + + Spawner spawner = Spawner.createSpawner( + functionConfig, + limitsConfig, + jarFile, + containerFactory, + null, + 0); server = ServerBuilder.forPort(port) - .addService(new InstanceControlImpl(javaInstanceRunnable)) + .addService(new InstanceControlImpl(spawner)) .build() .start(); log.info("JaveInstance Server started, listening on " + port); @@ -164,25 +168,19 @@ public void start() throws Exception { @Override public void run() { // Use stderr here since the logger may have been reset by its JVM shutdown hook. - System.err.println("*** shutting down gRPC server since JVM is shutting down"); try { - JavaInstanceMain.this.stop(); + server.shutdown(); + spawner.close(); } catch (Exception ex) { System.err.println(ex); } - System.err.println("*** server shut down"); } }); - fnThread.start(); - server.awaitTermination(); - } - - private void stop() throws Exception { - if (server != null) { - server.shutdown(); - } - fnThread.interrupt(); - fnThread.join(); + log.info("Starting spawner"); + spawner.start(); + spawner.join(); + log.info("Spawner quit, shutting down JavaInstance"); + server.shutdown(); } public static void main(String[] args) throws Exception { @@ -196,23 +194,21 @@ public static void main(String[] args) throws Exception { } static class InstanceControlImpl extends InstanceControlGrpc.InstanceControlImplBase { - private JavaInstanceRunnable javaInstanceRunnable; + private Spawner spawner; - public InstanceControlImpl(JavaInstanceRunnable javaInstanceRunnable) { - this.javaInstanceRunnable = javaInstanceRunnable; + public InstanceControlImpl(Spawner spawner) { + this.spawner = spawner; } @Override public void getFunctionStatus(Empty request, StreamObserver responseObserver) { - InstanceCommunication.FunctionStatus.Builder builder = javaInstanceRunnable.getFunctionStatus(); - String failureException = javaInstanceRunnable.getFailureException() != null - ? javaInstanceRunnable.getFailureException().getMessage() : ""; - InstanceCommunication.FunctionStatus response = builder - .setRunning(true) - .setFailureException(failureException) - .build(); - responseObserver.onNext(response); - responseObserver.onCompleted(); + try { + InstanceCommunication.FunctionStatus response = spawner.getFunctionStatus().get(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } catch (Exception e) { + throw new RuntimeException(e); + } } } }