From 587da921dcab0f2f02038d0e44b18e9fd083f61c Mon Sep 17 00:00:00 2001 From: Boyang Jerry Peng Date: Thu, 22 Feb 2018 13:43:50 -0800 Subject: [PATCH] Revert "Move all the components required for running worker (except rest endpoint) to WorkerService (#239)" (#241) This reverts commit 8fd1fd1870753ea8fb2576873800380ac2cea3dd. --- .../pulsar/functions/worker/Worker.java | 226 +++++++++++++- .../functions/worker/WorkerService.java | 280 ------------------ .../functions/worker/rest/Resources.java | 2 +- .../functions/worker/rest/WorkerServer.java | 40 ++- 4 files changed, 251 insertions(+), 297 deletions(-) delete mode 100644 pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java index 08dad103e3e29..8e1a53a09062f 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java @@ -18,37 +18,211 @@ */ package org.apache.pulsar.functions.worker; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.util.concurrent.AbstractService; +import java.io.IOException; +import java.net.URI; + import lombok.extern.slf4j.Slf4j; +import org.apache.distributedlog.DistributedLogConfiguration; +import org.apache.distributedlog.api.namespace.Namespace; +import org.apache.distributedlog.api.namespace.NamespaceBuilder; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.common.conf.InternalConfigurationData; +import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.functions.worker.rest.WorkerServer; +import javax.ws.rs.core.Response; + @Slf4j public class Worker extends AbstractService { private final WorkerConfig workerConfig; - private final WorkerService workerService; + private PulsarClient client; + private FunctionRuntimeManager functionRuntimeManager; + private FunctionMetaDataManager functionMetaDataManager; + private ClusterServiceCoordinator clusterServiceCoordinator; private Thread serverThread; + private Namespace dlogNamespace; + private MembershipManager membershipManager; + private SchedulerManager schedulerManager; public Worker(WorkerConfig workerConfig) { this.workerConfig = workerConfig; - this.workerService = new WorkerService(workerConfig); } @Override protected void doStart() { try { doStartImpl(); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - log.error("Interrupted at starting worker", ie); } catch (Throwable t) { - log.error("Failed to start worker", t); + t.printStackTrace(); } } protected void doStartImpl() throws InterruptedException { - WorkerServer server = new WorkerServer(workerService); + log.info("Starting worker {}...", workerConfig.getWorkerId()); + try { + log.info("Worker Configs: {}",new ObjectMapper().writerWithDefaultPrettyPrinter() + .writeValueAsString(workerConfig)); + } catch (JsonProcessingException e) { + log.warn("Failed to print worker configs with error {}", e.getMessage(), e); + } + + // initializing pulsar functions namespace + PulsarAdmin admin = Utils.getPulsarAdminClient(this.workerConfig.getPulsarWebServiceUrl()); + InternalConfigurationData internalConf; + // make sure pulsar broker is up + log.info("Checking if broker at {} is up...", this.workerConfig.getPulsarWebServiceUrl()); + int maxRetries = workerConfig.getInitialBrokerReconnectMaxRetries(); + int retries = 0; + while (true) { + try { + admin.clusters().getClusters(); + break; + } catch (PulsarAdminException e) { + log.warn("Retry to connect to Pulsar broker at {}", this.workerConfig.getPulsarWebServiceUrl()); + if (retries >= maxRetries) { + log.error("Failed to connect to Pulsar broker at {} after {} attempts", + this.workerConfig.getPulsarFunctionsNamespace(), maxRetries); + throw new RuntimeException("Failed to connect to Pulsar broker"); + } + retries ++; + Thread.sleep(1000); + } + } + + // getting namespace policy + log.info("Initializing Pulsar Functions namespace..."); + try { + try { + admin.namespaces().getPolicies(this.workerConfig.getPulsarFunctionsNamespace()); + } catch (PulsarAdminException e) { + if (e.getStatusCode() == Response.Status.NOT_FOUND.getStatusCode()) { + // if not found than create + try { + admin.namespaces().createNamespace(this.workerConfig.getPulsarFunctionsNamespace()); + } catch (PulsarAdminException e1) { + // prevent race condition with other workers starting up + if (e1.getStatusCode() != Response.Status.CONFLICT.getStatusCode()) { + log.error("Failed to create namespace {} for pulsar functions", this.workerConfig + .getPulsarFunctionsNamespace(), e1); + throw new RuntimeException(e1); + } + } + try { + admin.namespaces().setRetention( + this.workerConfig.getPulsarFunctionsNamespace(), + new RetentionPolicies(Integer.MAX_VALUE, Integer.MAX_VALUE)); + } catch (PulsarAdminException e1) { + log.error("Failed to set retention policy for pulsar functions namespace", e); + throw new RuntimeException(e1); + } + } else { + log.error("Failed to get retention policy for pulsar function namespace {}", + this.workerConfig.getPulsarFunctionsNamespace(), e); + throw new RuntimeException(e); + } + } + try { + internalConf = admin.brokers().getInternalConfigurationData(); + } catch (PulsarAdminException e) { + log.error("Failed to retrieve broker internal configuration", e); + throw new RuntimeException(e); + } + } finally { + admin.close(); + } + + // initialize the dlog namespace + // TODO: move this as part of pulsar cluster initialization later + URI dlogUri; + try { + dlogUri = Utils.initializeDlogNamespace( + internalConf.getZookeeperServers(), + internalConf.getLedgersRootPath()); + } catch (IOException ioe) { + log.error("Failed to initialize dlog namespace at zookeeper {} for storing function packages", + internalConf.getZookeeperServers(), ioe); + throw new RuntimeException(ioe); + } + + // create the dlog namespace for storing function packages + DistributedLogConfiguration dlogConf = Utils.getDlogConf(workerConfig); + try { + this.dlogNamespace = NamespaceBuilder.newBuilder() + .conf(dlogConf) + .clientId("function-worker-" + workerConfig.getWorkerId()) + .uri(dlogUri) + .build(); + } catch (Exception e) { + log.error("Failed to initialize dlog namespace {} for storing function packages", + dlogUri, e); + throw new RuntimeException(e); + } + + // initialize the function metadata manager + try { + + this.client = PulsarClient.create(this.workerConfig.getPulsarServiceUrl()); + log.info("Created Pulsar client"); + + //create scheduler manager + this.schedulerManager = new SchedulerManager(this.workerConfig, this.client); + + //create function meta data manager + this.functionMetaDataManager = new FunctionMetaDataManager( + this.workerConfig, this.schedulerManager, this.client); + + //create membership manager + this.membershipManager = new MembershipManager(this.workerConfig, this.client); + + // create function runtime manager + this.functionRuntimeManager = new FunctionRuntimeManager( + this.workerConfig, this.client, this.dlogNamespace, this.membershipManager); + + // Setting references to managers in scheduler + this.schedulerManager.setFunctionMetaDataManager(this.functionMetaDataManager); + this.schedulerManager.setFunctionRuntimeManager(this.functionRuntimeManager); + this.schedulerManager.setMembershipManager(this.membershipManager); + + // initialize function metadata manager + this.functionMetaDataManager.initialize(); + + // Starting cluster services + log.info("Start cluster services..."); + this.clusterServiceCoordinator = new ClusterServiceCoordinator( + this.workerConfig.getWorkerId(), + membershipManager); + // start periodic snapshot routine + this.clusterServiceCoordinator.addTask( + "snapshot", + this.workerConfig.getSnapshotFreqMs(), + () -> functionMetaDataManager.snapshot()); + + this.clusterServiceCoordinator.addTask("membership-monitor", + this.workerConfig.getFailureCheckFreqMs(), + () -> membershipManager.checkFailures( + functionMetaDataManager, functionRuntimeManager, schedulerManager)); + + this.clusterServiceCoordinator.start(); + + // Start function runtime manager + this.functionRuntimeManager.start(); + + } catch (Exception e) { + log.error("Error Starting up in worker", e); + throw new RuntimeException(e); + } + + WorkerServer server = new WorkerServer( + this.workerConfig, this.functionMetaDataManager, this.functionRuntimeManager, + this.membershipManager, this.dlogNamespace); this.serverThread = new Thread(server, server.getThreadName()); log.info("Start worker server on port {}...", this.workerConfig.getWorkerPort()); @@ -65,6 +239,42 @@ protected void doStop() { log.warn("Worker server thread is interrupted", e); } } - workerService.stop(); + if (null != functionMetaDataManager) { + try { + functionMetaDataManager.close(); + } catch (Exception e) { + log.warn("Failed to close function metadata manager", e); + } + } + if (null != functionRuntimeManager) { + try { + functionRuntimeManager.close(); + } catch (Exception e) { + log.warn("Failed to close function runtime manager", e); + } + } + if (null != client) { + try { + client.close(); + } catch (PulsarClientException e) { + log.warn("Failed to close pulsar client", e); + } + } + + if (null != clusterServiceCoordinator) { + clusterServiceCoordinator.close(); + } + + if (null != membershipManager) { + try { + membershipManager.close(); + } catch (PulsarClientException e) { + log.warn("Failed to close membership manager", e); + } + } + + if (null != schedulerManager) { + schedulerManager.close(); + } } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java deleted file mode 100644 index 2c4adbf2a6163..0000000000000 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java +++ /dev/null @@ -1,280 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.functions.worker; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import java.io.IOException; -import java.net.URI; -import javax.ws.rs.core.Response; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; -import org.apache.distributedlog.DistributedLogConfiguration; -import org.apache.distributedlog.api.namespace.Namespace; -import org.apache.distributedlog.api.namespace.NamespaceBuilder; -import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.common.conf.InternalConfigurationData; -import org.apache.pulsar.common.policies.data.RetentionPolicies; -import org.apache.pulsar.functions.worker.rest.BaseApiResource; -import org.apache.pulsar.functions.worker.rest.Resources; -import org.eclipse.jetty.servlet.ServletContextHandler; -import org.eclipse.jetty.servlet.ServletHolder; -import org.glassfish.jersey.server.ResourceConfig; -import org.glassfish.jersey.servlet.ServletContainer; - -/** - * A service component contains everything to run a worker except rest server. - */ -@Slf4j -@Getter -public class WorkerService { - - private final WorkerConfig workerConfig; - - private PulsarClient client; - private FunctionRuntimeManager functionRuntimeManager; - private FunctionMetaDataManager functionMetaDataManager; - private ClusterServiceCoordinator clusterServiceCoordinator; - private Namespace dlogNamespace; - private MembershipManager membershipManager; - private SchedulerManager schedulerManager; - - public WorkerService(WorkerConfig workerConfig) { - this.workerConfig = workerConfig; - } - - public ServletContextHandler newServletContextHandler(String contextPath) { - final ResourceConfig config = new ResourceConfig(Resources.get()); - final ServletContextHandler contextHandler = - new ServletContextHandler(ServletContextHandler.NO_SESSIONS); - - contextHandler.setAttribute(BaseApiResource.ATTRIBUTE_WORKER_CONFIG, this.workerConfig); - contextHandler.setAttribute(BaseApiResource.ATTRIBUTE_WORKER_FUNCTION_STATE_MANAGER, this.functionMetaDataManager); - contextHandler.setAttribute(BaseApiResource.ATTRIBUTE_WORKER_FUNCTION_RUNTIME_MANAGER, this.functionRuntimeManager); - contextHandler.setAttribute(BaseApiResource.ATTRIBUTE_MEMBERSHIP_MANAGER, this.membershipManager); - contextHandler.setAttribute(BaseApiResource.ATTRIBUTE_WORKER_DLOG_NAMESPACE, this.dlogNamespace); - contextHandler.setContextPath(contextPath); - - final ServletHolder apiServlet = - new ServletHolder(new ServletContainer(config)); - contextHandler.addServlet(apiServlet, "/*"); - - return contextHandler; - } - - public void start() throws InterruptedException { - log.info("Starting worker {}...", workerConfig.getWorkerId()); - try { - log.info("Worker Configs: {}", new ObjectMapper().writerWithDefaultPrettyPrinter() - .writeValueAsString(workerConfig)); - } catch (JsonProcessingException e) { - log.warn("Failed to print worker configs with error {}", e.getMessage(), e); - } - - // initializing pulsar functions namespace - PulsarAdmin admin = Utils.getPulsarAdminClient(this.workerConfig.getPulsarWebServiceUrl()); - InternalConfigurationData internalConf; - // make sure pulsar broker is up - log.info("Checking if broker at {} is up...", this.workerConfig.getPulsarWebServiceUrl()); - int maxRetries = workerConfig.getInitialBrokerReconnectMaxRetries(); - int retries = 0; - while (true) { - try { - admin.clusters().getClusters(); - break; - } catch (PulsarAdminException e) { - log.warn("Retry to connect to Pulsar broker at {}", this.workerConfig.getPulsarWebServiceUrl()); - if (retries >= maxRetries) { - log.error("Failed to connect to Pulsar broker at {} after {} attempts", - this.workerConfig.getPulsarFunctionsNamespace(), maxRetries); - throw new RuntimeException("Failed to connect to Pulsar broker"); - } - retries ++; - Thread.sleep(1000); - } - } - - // getting namespace policy - log.info("Initializing Pulsar Functions namespace..."); - try { - try { - admin.namespaces().getPolicies(this.workerConfig.getPulsarFunctionsNamespace()); - } catch (PulsarAdminException e) { - if (e.getStatusCode() == Response.Status.NOT_FOUND.getStatusCode()) { - // if not found than create - try { - admin.namespaces().createNamespace(this.workerConfig.getPulsarFunctionsNamespace()); - } catch (PulsarAdminException e1) { - // prevent race condition with other workers starting up - if (e1.getStatusCode() != Response.Status.CONFLICT.getStatusCode()) { - log.error("Failed to create namespace {} for pulsar functions", this.workerConfig - .getPulsarFunctionsNamespace(), e1); - throw new RuntimeException(e1); - } - } - try { - admin.namespaces().setRetention( - this.workerConfig.getPulsarFunctionsNamespace(), - new RetentionPolicies(Integer.MAX_VALUE, Integer.MAX_VALUE)); - } catch (PulsarAdminException e1) { - log.error("Failed to set retention policy for pulsar functions namespace", e); - throw new RuntimeException(e1); - } - } else { - log.error("Failed to get retention policy for pulsar function namespace {}", - this.workerConfig.getPulsarFunctionsNamespace(), e); - throw new RuntimeException(e); - } - } - try { - internalConf = admin.brokers().getInternalConfigurationData(); - } catch (PulsarAdminException e) { - log.error("Failed to retrieve broker internal configuration", e); - throw new RuntimeException(e); - } - } finally { - admin.close(); - } - - // initialize the dlog namespace - // TODO: move this as part of pulsar cluster initialization later - URI dlogUri; - try { - dlogUri = Utils.initializeDlogNamespace( - internalConf.getZookeeperServers(), - internalConf.getLedgersRootPath()); - } catch (IOException ioe) { - log.error("Failed to initialize dlog namespace at zookeeper {} for storing function packages", - internalConf.getZookeeperServers(), ioe); - throw new RuntimeException(ioe); - } - - // create the dlog namespace for storing function packages - DistributedLogConfiguration dlogConf = Utils.getDlogConf(workerConfig); - try { - this.dlogNamespace = NamespaceBuilder.newBuilder() - .conf(dlogConf) - .clientId("function-worker-" + workerConfig.getWorkerId()) - .uri(dlogUri) - .build(); - } catch (Exception e) { - log.error("Failed to initialize dlog namespace {} for storing function packages", - dlogUri, e); - throw new RuntimeException(e); - } - - // initialize the function metadata manager - try { - - this.client = PulsarClient.create(this.workerConfig.getPulsarServiceUrl()); - log.info("Created Pulsar client"); - - //create scheduler manager - this.schedulerManager = new SchedulerManager(this.workerConfig, this.client); - - //create function meta data manager - this.functionMetaDataManager = new FunctionMetaDataManager( - this.workerConfig, this.schedulerManager, this.client); - - //create membership manager - this.membershipManager = new MembershipManager(this.workerConfig, this.client); - - // create function runtime manager - this.functionRuntimeManager = new FunctionRuntimeManager( - this.workerConfig, this.client, this.dlogNamespace, this.membershipManager); - - // Setting references to managers in scheduler - this.schedulerManager.setFunctionMetaDataManager(this.functionMetaDataManager); - this.schedulerManager.setFunctionRuntimeManager(this.functionRuntimeManager); - this.schedulerManager.setMembershipManager(this.membershipManager); - - // initialize function metadata manager - this.functionMetaDataManager.initialize(); - - // Starting cluster services - log.info("Start cluster services..."); - this.clusterServiceCoordinator = new ClusterServiceCoordinator( - this.workerConfig.getWorkerId(), - membershipManager); - // start periodic snapshot routine - this.clusterServiceCoordinator.addTask( - "snapshot", - this.workerConfig.getSnapshotFreqMs(), - () -> functionMetaDataManager.snapshot()); - - this.clusterServiceCoordinator.addTask("membership-monitor", - this.workerConfig.getFailureCheckFreqMs(), - () -> membershipManager.checkFailures( - functionMetaDataManager, functionRuntimeManager, schedulerManager)); - - this.clusterServiceCoordinator.start(); - - // Start function runtime manager - this.functionRuntimeManager.start(); - - } catch (Exception e) { - log.error("Error Starting up in worker", e); - throw new RuntimeException(e); - } - } - - public void stop() { - if (null != functionMetaDataManager) { - try { - functionMetaDataManager.close(); - } catch (Exception e) { - log.warn("Failed to close function metadata manager", e); - } - } - if (null != functionRuntimeManager) { - try { - functionRuntimeManager.close(); - } catch (Exception e) { - log.warn("Failed to close function runtime manager", e); - } - } - if (null != client) { - try { - client.close(); - } catch (PulsarClientException e) { - log.warn("Failed to close pulsar client", e); - } - } - - if (null != clusterServiceCoordinator) { - clusterServiceCoordinator.close(); - } - - if (null != membershipManager) { - try { - membershipManager.close(); - } catch (PulsarClientException e) { - log.warn("Failed to close membership manager", e); - } - } - - if (null != schedulerManager) { - schedulerManager.close(); - } - } - -} diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java index e1a840347f295..ec83975b4bbf2 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java @@ -31,7 +31,7 @@ public final class Resources { private Resources() { } - public static Set> get() { + static Set> get() { return new HashSet<>(getClasses()); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java index 96e898e1a92b6..d031fd2572afe 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java @@ -18,19 +18,31 @@ */ package org.apache.pulsar.functions.worker.rest; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.distributedlog.api.namespace.Namespace; +import org.apache.pulsar.functions.worker.FunctionMetaDataManager; +import org.apache.pulsar.functions.worker.FunctionRuntimeManager; +import org.apache.pulsar.functions.worker.MembershipManager; import org.apache.pulsar.functions.worker.WorkerConfig; -import org.apache.pulsar.functions.worker.WorkerService; import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.glassfish.jersey.server.ResourceConfig; +import org.glassfish.jersey.servlet.ServletContainer; import java.net.BindException; import java.net.URI; @Slf4j +@RequiredArgsConstructor public class WorkerServer implements Runnable { private final WorkerConfig workerConfig; - private final WorkerService workerService; + private final FunctionMetaDataManager functionMetaDataManager; + private final FunctionRuntimeManager functionRuntimeManager; + private final MembershipManager membershipManager; + private final Namespace dlogNamespace; private static String getErrorMessage(Server server, int port, Exception ex) { if (ex instanceof BindException) { @@ -41,16 +53,28 @@ private static String getErrorMessage(Server server, int port, Exception ex) { return ex.getMessage(); } - public WorkerServer(WorkerService workerService) { - this.workerConfig = workerService.getWorkerConfig(); - this.workerService = workerService; - } - @Override public void run() { final Server server = new Server(this.workerConfig.getWorkerPort()); - server.setHandler(workerService.newServletContextHandler("/")); + final ResourceConfig config = new ResourceConfig(Resources.get()); + + final ServletContextHandler contextHandler = + new ServletContextHandler(ServletContextHandler.NO_SESSIONS); + + contextHandler.setAttribute(BaseApiResource.ATTRIBUTE_WORKER_CONFIG, this.workerConfig); + contextHandler.setAttribute(BaseApiResource.ATTRIBUTE_WORKER_FUNCTION_STATE_MANAGER, this.functionMetaDataManager); + contextHandler.setAttribute(BaseApiResource.ATTRIBUTE_WORKER_FUNCTION_RUNTIME_MANAGER, this.functionRuntimeManager); + contextHandler.setAttribute(BaseApiResource.ATTRIBUTE_MEMBERSHIP_MANAGER, this.membershipManager); + contextHandler.setAttribute(BaseApiResource.ATTRIBUTE_WORKER_DLOG_NAMESPACE, this.dlogNamespace); + contextHandler.setContextPath("/"); + + server.setHandler(contextHandler); + + final ServletHolder apiServlet = + new ServletHolder(new ServletContainer(config)); + + contextHandler.addServlet(apiServlet, "/*"); try { server.start();