Skip to content

Commit

Permalink
Revert "Move all the components required for running worker (except r…
Browse files Browse the repository at this point in the history
…est endpoint) to WorkerService (apache#239)" (apache#241)

This reverts commit 8fd1fd1870753ea8fb2576873800380ac2cea3dd.
  • Loading branch information
jerrypeng authored and sijie committed Mar 4, 2018
1 parent cb48b09 commit 587da92
Show file tree
Hide file tree
Showing 4 changed files with 251 additions and 297 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,37 +18,211 @@
*/
package org.apache.pulsar.functions.worker;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.AbstractService;

import java.io.IOException;
import java.net.URI;

import lombok.extern.slf4j.Slf4j;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.functions.worker.rest.WorkerServer;

import javax.ws.rs.core.Response;

@Slf4j
public class Worker extends AbstractService {

private final WorkerConfig workerConfig;
private final WorkerService workerService;
private PulsarClient client;
private FunctionRuntimeManager functionRuntimeManager;
private FunctionMetaDataManager functionMetaDataManager;
private ClusterServiceCoordinator clusterServiceCoordinator;
private Thread serverThread;
private Namespace dlogNamespace;
private MembershipManager membershipManager;
private SchedulerManager schedulerManager;

public Worker(WorkerConfig workerConfig) {
this.workerConfig = workerConfig;
this.workerService = new WorkerService(workerConfig);
}

@Override
protected void doStart() {
try {
doStartImpl();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
log.error("Interrupted at starting worker", ie);
} catch (Throwable t) {
log.error("Failed to start worker", t);
t.printStackTrace();
}
}

protected void doStartImpl() throws InterruptedException {
WorkerServer server = new WorkerServer(workerService);
log.info("Starting worker {}...", workerConfig.getWorkerId());
try {
log.info("Worker Configs: {}",new ObjectMapper().writerWithDefaultPrettyPrinter()
.writeValueAsString(workerConfig));
} catch (JsonProcessingException e) {
log.warn("Failed to print worker configs with error {}", e.getMessage(), e);
}

// initializing pulsar functions namespace
PulsarAdmin admin = Utils.getPulsarAdminClient(this.workerConfig.getPulsarWebServiceUrl());
InternalConfigurationData internalConf;
// make sure pulsar broker is up
log.info("Checking if broker at {} is up...", this.workerConfig.getPulsarWebServiceUrl());
int maxRetries = workerConfig.getInitialBrokerReconnectMaxRetries();
int retries = 0;
while (true) {
try {
admin.clusters().getClusters();
break;
} catch (PulsarAdminException e) {
log.warn("Retry to connect to Pulsar broker at {}", this.workerConfig.getPulsarWebServiceUrl());
if (retries >= maxRetries) {
log.error("Failed to connect to Pulsar broker at {} after {} attempts",
this.workerConfig.getPulsarFunctionsNamespace(), maxRetries);
throw new RuntimeException("Failed to connect to Pulsar broker");
}
retries ++;
Thread.sleep(1000);
}
}

// getting namespace policy
log.info("Initializing Pulsar Functions namespace...");
try {
try {
admin.namespaces().getPolicies(this.workerConfig.getPulsarFunctionsNamespace());
} catch (PulsarAdminException e) {
if (e.getStatusCode() == Response.Status.NOT_FOUND.getStatusCode()) {
// if not found than create
try {
admin.namespaces().createNamespace(this.workerConfig.getPulsarFunctionsNamespace());
} catch (PulsarAdminException e1) {
// prevent race condition with other workers starting up
if (e1.getStatusCode() != Response.Status.CONFLICT.getStatusCode()) {
log.error("Failed to create namespace {} for pulsar functions", this.workerConfig
.getPulsarFunctionsNamespace(), e1);
throw new RuntimeException(e1);
}
}
try {
admin.namespaces().setRetention(
this.workerConfig.getPulsarFunctionsNamespace(),
new RetentionPolicies(Integer.MAX_VALUE, Integer.MAX_VALUE));
} catch (PulsarAdminException e1) {
log.error("Failed to set retention policy for pulsar functions namespace", e);
throw new RuntimeException(e1);
}
} else {
log.error("Failed to get retention policy for pulsar function namespace {}",
this.workerConfig.getPulsarFunctionsNamespace(), e);
throw new RuntimeException(e);
}
}
try {
internalConf = admin.brokers().getInternalConfigurationData();
} catch (PulsarAdminException e) {
log.error("Failed to retrieve broker internal configuration", e);
throw new RuntimeException(e);
}
} finally {
admin.close();
}

// initialize the dlog namespace
// TODO: move this as part of pulsar cluster initialization later
URI dlogUri;
try {
dlogUri = Utils.initializeDlogNamespace(
internalConf.getZookeeperServers(),
internalConf.getLedgersRootPath());
} catch (IOException ioe) {
log.error("Failed to initialize dlog namespace at zookeeper {} for storing function packages",
internalConf.getZookeeperServers(), ioe);
throw new RuntimeException(ioe);
}

// create the dlog namespace for storing function packages
DistributedLogConfiguration dlogConf = Utils.getDlogConf(workerConfig);
try {
this.dlogNamespace = NamespaceBuilder.newBuilder()
.conf(dlogConf)
.clientId("function-worker-" + workerConfig.getWorkerId())
.uri(dlogUri)
.build();
} catch (Exception e) {
log.error("Failed to initialize dlog namespace {} for storing function packages",
dlogUri, e);
throw new RuntimeException(e);
}

// initialize the function metadata manager
try {

this.client = PulsarClient.create(this.workerConfig.getPulsarServiceUrl());
log.info("Created Pulsar client");

//create scheduler manager
this.schedulerManager = new SchedulerManager(this.workerConfig, this.client);

//create function meta data manager
this.functionMetaDataManager = new FunctionMetaDataManager(
this.workerConfig, this.schedulerManager, this.client);

//create membership manager
this.membershipManager = new MembershipManager(this.workerConfig, this.client);

// create function runtime manager
this.functionRuntimeManager = new FunctionRuntimeManager(
this.workerConfig, this.client, this.dlogNamespace, this.membershipManager);

// Setting references to managers in scheduler
this.schedulerManager.setFunctionMetaDataManager(this.functionMetaDataManager);
this.schedulerManager.setFunctionRuntimeManager(this.functionRuntimeManager);
this.schedulerManager.setMembershipManager(this.membershipManager);

// initialize function metadata manager
this.functionMetaDataManager.initialize();

// Starting cluster services
log.info("Start cluster services...");
this.clusterServiceCoordinator = new ClusterServiceCoordinator(
this.workerConfig.getWorkerId(),
membershipManager);
// start periodic snapshot routine
this.clusterServiceCoordinator.addTask(
"snapshot",
this.workerConfig.getSnapshotFreqMs(),
() -> functionMetaDataManager.snapshot());

this.clusterServiceCoordinator.addTask("membership-monitor",
this.workerConfig.getFailureCheckFreqMs(),
() -> membershipManager.checkFailures(
functionMetaDataManager, functionRuntimeManager, schedulerManager));

this.clusterServiceCoordinator.start();

// Start function runtime manager
this.functionRuntimeManager.start();

} catch (Exception e) {
log.error("Error Starting up in worker", e);
throw new RuntimeException(e);
}

WorkerServer server = new WorkerServer(
this.workerConfig, this.functionMetaDataManager, this.functionRuntimeManager,
this.membershipManager, this.dlogNamespace);
this.serverThread = new Thread(server, server.getThreadName());

log.info("Start worker server on port {}...", this.workerConfig.getWorkerPort());
Expand All @@ -65,6 +239,42 @@ protected void doStop() {
log.warn("Worker server thread is interrupted", e);
}
}
workerService.stop();
if (null != functionMetaDataManager) {
try {
functionMetaDataManager.close();
} catch (Exception e) {
log.warn("Failed to close function metadata manager", e);
}
}
if (null != functionRuntimeManager) {
try {
functionRuntimeManager.close();
} catch (Exception e) {
log.warn("Failed to close function runtime manager", e);
}
}
if (null != client) {
try {
client.close();
} catch (PulsarClientException e) {
log.warn("Failed to close pulsar client", e);
}
}

if (null != clusterServiceCoordinator) {
clusterServiceCoordinator.close();
}

if (null != membershipManager) {
try {
membershipManager.close();
} catch (PulsarClientException e) {
log.warn("Failed to close membership manager", e);
}
}

if (null != schedulerManager) {
schedulerManager.close();
}
}
}
Loading

0 comments on commit 587da92

Please sign in to comment.