Skip to content

Commit

Permalink
DRILL-7191: RM blobs persistence in Zookeeper for Distributed RM.
Browse files Browse the repository at this point in the history
Added stubs for QueryResourceManager exit and wait/cleanup thread
Update MemoryCalculator to use DrillNode instead of DrillbitEndpoint
Changes to support localbit resource registration to cluster state blob using DrillbitStatusListener
Support ThrottledResourceManager via ResourceManagerBuilder
Add some E2E tests and RMStateBlobs tests along with some bug fixes
Fix TestRMConfigLoad tests to handle case where ZKQueues are explicitly enabled
  • Loading branch information
sohami committed Apr 23, 2019
1 parent 2b6a91a commit 2800c57
Show file tree
Hide file tree
Showing 45 changed files with 1,544 additions and 575 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,6 @@
*/
package org.apache.drill.yarn.zk;

import static org.apache.drill.shaded.guava.com.google.common.collect.Collections2.transform;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.drill.shaded.guava.com.google.common.base.Throwables;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.curator.RetryPolicy;
Expand Down Expand Up @@ -55,8 +44,16 @@
import org.apache.drill.exec.coord.zk.ZkTransientStoreFactory;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State;
import org.apache.drill.shaded.guava.com.google.common.base.Throwables;

import org.apache.drill.shaded.guava.com.google.common.base.Function;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
* Manages cluster coordination utilizing zookeeper.
Expand Down Expand Up @@ -87,8 +84,7 @@ public class ZKClusterCoordinator extends ClusterCoordinator {

private CuratorFramework curator;
private ServiceDiscovery<DrillbitEndpoint> discovery;
private volatile Collection<DrillbitEndpoint> endpoints = Collections
.emptyList();
private volatile Map<String, DrillbitEndpoint> endpointsMap = Collections.emptyMap();
private final String serviceName;
private final CountDownLatch initialConnection = new CountDownLatch(1);
private final TransientStoreFactory factory;
Expand Down Expand Up @@ -214,7 +210,7 @@ public void unregister(RegistrationHandle handle) {

@Override
public Collection<DrillbitEndpoint> getAvailableEndpoints() {
return this.endpoints;
return this.endpointsMap.values();
}

@Override
Expand All @@ -233,35 +229,33 @@ public <V> TransientStore<V> getOrCreateTransientStore(

private synchronized void updateEndpoints() {
try {
Collection<DrillbitEndpoint> newDrillbitSet = transform(
discovery.queryForInstances(serviceName),
new Function<ServiceInstance<DrillbitEndpoint>, DrillbitEndpoint>() {
@Override
public DrillbitEndpoint apply(
ServiceInstance<DrillbitEndpoint> input) {
return input.getPayload();
}
});
// All active bits in the Zookeeper
final Map<String, DrillbitEndpoint> UUIDtoEndpoints = discovery.queryForInstances(serviceName).stream()
.collect(Collectors.toConcurrentMap(ServiceInstance::getId, ServiceInstance::getPayload));

// set of newly dead bits : original bits - new set of active bits.
Set<DrillbitEndpoint> unregisteredBits = new HashSet<>(endpoints);
unregisteredBits.removeAll(newDrillbitSet);
Map<String, DrillbitEndpoint> unregisteredBits = new HashMap<>(endpointsMap);
for (Map.Entry<String, DrillbitEndpoint> newEndpoint : UUIDtoEndpoints.entrySet()) {
unregisteredBits.remove(newEndpoint.getKey());
}

// Set of newly live bits : new set of active bits - original bits.
Set<DrillbitEndpoint> registeredBits = new HashSet<>(newDrillbitSet);
registeredBits.removeAll(endpoints);
Map<String, DrillbitEndpoint> registeredBits = new HashMap<>(UUIDtoEndpoints);
for (Map.Entry<String, DrillbitEndpoint> newEndpoint : endpointsMap.entrySet()) {
registeredBits.remove(newEndpoint.getKey());
}

endpoints = newDrillbitSet;
endpointsMap = UUIDtoEndpoints;

if (logger.isDebugEnabled()) {
StringBuilder builder = new StringBuilder();
builder.append("Active drillbit set changed. Now includes ");
builder.append(newDrillbitSet.size());
builder.append(UUIDtoEndpoints.size());
builder.append(" total bits.");
if (!newDrillbitSet.isEmpty()) {
if (!UUIDtoEndpoints.isEmpty()) {
builder.append(" New active drillbits: \n");
}
for (DrillbitEndpoint bit : newDrillbitSet) {
for (DrillbitEndpoint bit : UUIDtoEndpoints.values()) {
builder.append('\t');
builder.append(bit.getAddress());
builder.append(':');
Expand All @@ -277,11 +271,13 @@ public DrillbitEndpoint apply(

// Notify the drillbit listener for newly unregistered bits.
if (!(unregisteredBits.isEmpty())) {
drillbitUnregistered(unregisteredBits);
drillbitUnregistered(unregisteredBits.entrySet().stream().collect(
Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey)));
}
// Notify the drillbit listener for newly registered bits.
if (!(registeredBits.isEmpty())) {
drillbitRegistered(registeredBits);
drillbitRegistered(registeredBits.entrySet().stream().collect(
Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey)));
}

} catch (Exception e) {
Expand Down
25 changes: 13 additions & 12 deletions drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,24 @@
*/
package org.apache.drill.yarn.zk;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.work.foreman.DrillbitStatusListener;
import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
import org.apache.drill.yarn.appMaster.AMWrapperException;
import org.apache.drill.yarn.appMaster.EventContext;
import org.apache.drill.yarn.appMaster.Pollable;
import org.apache.drill.yarn.appMaster.RegistryHandler;
import org.apache.drill.yarn.appMaster.Task;
import org.apache.drill.yarn.appMaster.TaskLifecycleListener;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* AM-specific implementation of a Drillbit registry backed by ZooKeeper.
* Listens to ZK events for registering a Drillbit and deregistering. Alerts the
Expand Down Expand Up @@ -297,11 +297,12 @@ public AckEvent(Task task, DrillbitEndpoint endpoint) {
* Callback from ZK to indicate that one or more drillbits have become
* registered. We handle registrations in a critical section, then alert the
* cluster controller outside the critical section.
* @param registeredDrillbitsUUID
*/

@Override
public void drillbitRegistered(Set<DrillbitEndpoint> registeredDrillbits) {
List<AckEvent> updates = registerDrillbits(registeredDrillbits);
public void drillbitRegistered(Map<DrillbitEndpoint, String> registeredDrillbitsUUID) {
List<AckEvent> updates = registerDrillbits(registeredDrillbitsUUID.keySet());
for (AckEvent event : updates) {
if (event.task == null) {
registryHandler.reserveHost(event.endpoint.getAddress());
Expand Down Expand Up @@ -363,12 +364,12 @@ private AckEvent drillbitRegistered(DrillbitEndpoint dbe) {
* Callback from ZK to indicate that one or more drillbits have become
* deregistered from ZK. We handle the deregistrations in a critical section,
* but updates to the cluster controller outside of a critical section.
* @param unregisteredDrillbitsUUID
*/

@Override
public void drillbitUnregistered(
Set<DrillbitEndpoint> unregisteredDrillbits) {
List<AckEvent> updates = unregisterDrillbits(unregisteredDrillbits);
public void drillbitUnregistered(Map<DrillbitEndpoint, String> unregisteredDrillbitsUUID) {
List<AckEvent> updates = unregisterDrillbits(unregisteredDrillbitsUUID.keySet());
for (AckEvent event : updates) {
registryHandler.completionAck(event.task, ENDPOINT_PROPERTY);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,6 @@
*/
package org.apache.drill.yarn.zk;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
Expand All @@ -41,6 +32,15 @@
import org.apache.drill.yarn.zk.ZKRegistry.DrillbitTracker;
import org.junit.Test;

import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

/**
* Tests for the AM version of the cluster coordinator. The AM version has no
* dependencies on the DoY config system or other systems, making it easy to
Expand Down Expand Up @@ -108,14 +108,13 @@ private class TestDrillbitStatusListener implements DrillbitStatusListener {
protected Set<DrillbitEndpoint> removed;

@Override
public void drillbitUnregistered(
Set<DrillbitEndpoint> unregisteredDrillbits) {
removed = unregisteredDrillbits;
public void drillbitUnregistered(Map<DrillbitEndpoint, String> unregisteredDrillbitsUUID) {
removed = unregisteredDrillbitsUUID.keySet();
}

@Override
public void drillbitRegistered(Set<DrillbitEndpoint> registeredDrillbits) {
added = registeredDrillbits;
public void drillbitRegistered(Map<DrillbitEndpoint, String> registeredDrillbitsUUID) {
added = registeredDrillbitsUUID.keySet();
}

public void clear() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
Expand Down Expand Up @@ -107,17 +106,17 @@ public interface RegistrationHandle {

/**
* Actions to take when there are a set of new de-active drillbits.
* @param unregisteredBits
* @param unregisteredBitsUUID
*/
protected void drillbitUnregistered(Set<DrillbitEndpoint> unregisteredBits) {
protected void drillbitUnregistered(Map<DrillbitEndpoint, String> unregisteredBitsUUID) {
for (DrillbitStatusListener listener : listeners.keySet()) {
listener.drillbitUnregistered(unregisteredBits);
listener.drillbitUnregistered(unregisteredBitsUUID);
}
}

protected void drillbitRegistered(Set<DrillbitEndpoint> registeredBits) {
protected void drillbitRegistered(Map<DrillbitEndpoint, String> registeredBitsUUID) {
for (DrillbitStatusListener listener : listeners.keySet()) {
listener.drillbitRegistered(registeredBits);
listener.drillbitRegistered(registeredBitsUUID);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.details.ServiceCacheListener;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.DrillNode;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.coord.ClusterCoordinator;
Expand All @@ -46,11 +47,8 @@

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand All @@ -66,15 +64,13 @@ public class ZKClusterCoordinator extends ClusterCoordinator {

private CuratorFramework curator;
private ServiceDiscovery<DrillbitEndpoint> discovery;
private volatile Collection<DrillbitEndpoint> endpoints = Collections.emptyList();
private final String serviceName;
private final CountDownLatch initialConnection = new CountDownLatch(1);
private final TransientStoreFactory factory;
private ServiceCache<DrillbitEndpoint> serviceCache;
private DrillbitEndpoint endpoint;

// endpointsMap maps String UUID to Drillbit endpoints
private ConcurrentHashMap<String, DrillbitEndpoint> endpointsMap = new ConcurrentHashMap<>();
private Map<String, DrillbitEndpoint> endpointsMap = new ConcurrentHashMap<>();
private static final Pattern ZK_COMPLEX_STRING = Pattern.compile("(^.*?)/(.*)/([^/]*)$");

public ZKClusterCoordinator(DrillConfig config, String connect) {
Expand Down Expand Up @@ -220,7 +216,8 @@ public void unregister(RegistrationHandle handle) {
*/
public RegistrationHandle update(RegistrationHandle handle, State state) {
ZKRegistrationHandle h = (ZKRegistrationHandle) handle;
try {
final DrillbitEndpoint endpoint;
try {
endpoint = h.endpoint.toBuilder().setState(state).build();
ServiceInstance<DrillbitEndpoint> serviceInstance = ServiceInstance.<DrillbitEndpoint>builder()
.name(serviceName)
Expand All @@ -231,6 +228,7 @@ public RegistrationHandle update(RegistrationHandle handle, State state) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
handle.setEndPoint(endpoint);
return handle;
}

Expand Down Expand Up @@ -282,46 +280,49 @@ public <V> TransientStore<V> getOrCreateTransientStore(final TransientStoreConfi
private synchronized void updateEndpoints() {
try {
// All active bits in the Zookeeper
final Map<String, DrillbitEndpoint> activeEndpointsUUID = discovery.queryForInstances(serviceName).stream()
final Map<String, DrillbitEndpoint> UUIDtoEndpoints = discovery.queryForInstances(serviceName).stream()
.collect(Collectors.toMap(ServiceInstance::getId, ServiceInstance::getPayload));

final Map<DrillbitEndpoint, String> UUIDtoEndpoints = activeEndpointsUUID.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey));
final Map<DrillNode, String> activeEndpointsUUID = UUIDtoEndpoints.entrySet().stream()
.collect(Collectors.toMap(x -> DrillNode.create(x.getValue()), Map.Entry::getKey));

// set of newly dead bits : original bits - new set of active bits.
Set<DrillbitEndpoint> unregisteredBits = new HashSet<>();
final Map<DrillbitEndpoint, String> unregisteredBits = new HashMap<>();
// Set of newly live bits : new set of active bits - original bits.
Set<DrillbitEndpoint> registeredBits = new HashSet<>();
final Map<DrillbitEndpoint, String> registeredBits = new HashMap<>();


// Updates the endpoints map if there is a change in state of the endpoint or with the addition
// of new drillbit endpoints. Registered endpoints is set to newly live drillbit endpoints.
for (Map.Entry<String, DrillbitEndpoint> endpointToUUID : activeEndpointsUUID.entrySet()) {
endpointsMap.put(endpointToUUID.getKey(), endpointToUUID.getValue());
for (Map.Entry<String, DrillbitEndpoint> endpoint : UUIDtoEndpoints.entrySet()) {
// check if this bit is newly added bit
if (!endpointsMap.containsKey(endpoint.getKey())) {
registeredBits.put(endpoint.getValue(), endpoint.getKey());
}
endpointsMap.put(endpoint.getKey(), endpoint.getValue());
}

// Remove all the endpoints that are newly dead
for ( String bitUUID: endpointsMap.keySet()) {
if (!activeEndpointsUUID.containsKey(bitUUID)) {
if (!UUIDtoEndpoints.containsKey(bitUUID)) {
final DrillbitEndpoint unregisteredBit = endpointsMap.get(bitUUID);
unregisteredBits.add(unregisteredBit);

if (UUIDtoEndpoints.containsKey(unregisteredBit)) {
unregisteredBits.put(unregisteredBit, bitUUID);
final DrillNode unregisteredNode = DrillNode.create(unregisteredBit);
if (activeEndpointsUUID.containsKey(unregisteredNode)) {
logger.info("Drillbit registered again with different UUID. [Details: Address: {}, UserPort: {}," +
" PreviousUUID: {}, CurrentUUID: {}", unregisteredBit.getAddress(), unregisteredBit.getUserPort(),
bitUUID, UUIDtoEndpoints.get(unregisteredBit));
bitUUID, activeEndpointsUUID.get(unregisteredNode));
}
endpointsMap.remove(bitUUID);
}
}
endpoints = endpointsMap.values();
if (logger.isDebugEnabled()) {
StringBuilder builder = new StringBuilder();
builder.append("Active drillbit set changed. Now includes ");
builder.append(activeEndpointsUUID.size());
builder.append(UUIDtoEndpoints.size());
builder.append(" total bits. New active drillbits:\n");
builder.append("Address | User Port | Control Port | Data Port | Version | State\n");
for (DrillbitEndpoint bit: activeEndpointsUUID.values()) {
for (DrillbitEndpoint bit: UUIDtoEndpoints.values()) {
builder.append(bit.getAddress()).append(" | ");
builder.append(bit.getUserPort()).append(" | ");
builder.append(bit.getControlPort()).append(" | ");
Expand Down
Loading

0 comments on commit 2800c57

Please sign in to comment.