Skip to content

Commit

Permalink
deserialize load report based on load-manager (#338)
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia authored Apr 7, 2017
1 parent 4866563 commit 911a9b6
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import com.yahoo.pulsar.broker.stats.Metrics;
import com.yahoo.pulsar.common.naming.ServiceUnitId;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.common.policies.data.loadbalancer.ServiceLookupData;
import com.yahoo.pulsar.zookeeper.ZooKeeperCache.Deserializer;

/**
* LoadManager runs though set of load reports collected from different brokers and generates a recommendation of
Expand Down Expand Up @@ -57,6 +59,13 @@ public interface LoadManager {
* Generate the load report
*/
LoadReport generateLoadReport() throws Exception;

/**
* Returns {@link Deserializer} to deserialize load report
*
* @return
*/
Deserializer<? extends ServiceLookupData> getLoadReportDeserializer();

/**
* Set flag to force load report update
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import com.yahoo.pulsar.broker.PulsarServerException;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.common.naming.ServiceUnitId;
import com.yahoo.pulsar.common.policies.data.loadbalancer.ServiceLookupData;
import com.yahoo.pulsar.zookeeper.ZooKeeperCache.Deserializer;

/**
* New proposal for a load manager interface which attempts to use more intuitive method names and provide a starting
Expand Down Expand Up @@ -88,4 +90,11 @@ public interface ModularLoadManager {
* As the leader broker, write bundle data aggregated from all brokers to ZooKeeper.
*/
void writeBundleDataOnZooKeeper();

/**
* Return :{@link Deserializer} to deserialize load-manager load report
*
* @return
*/
Deserializer<? extends ServiceLookupData> getLoadReportDeserializer();
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.yahoo.pulsar.broker.loadbalance.impl;

import static com.yahoo.pulsar.broker.admin.AdminResource.jsonMapper;

import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
Expand Down Expand Up @@ -65,6 +67,7 @@
import com.yahoo.pulsar.common.policies.data.loadbalancer.NamespaceBundleStats;
import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;
import com.yahoo.pulsar.zookeeper.ZooKeeperCache.Deserializer;
import com.yahoo.pulsar.zookeeper.ZooKeeperCacheListener;
import com.yahoo.pulsar.zookeeper.ZooKeeperChildrenCache;
import com.yahoo.pulsar.zookeeper.ZooKeeperDataCache;
Expand Down Expand Up @@ -158,6 +161,9 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach

// ZooKeeper belonging to the pulsar service.
private ZooKeeper zkClient;

private static final Deserializer<LocalBrokerData> loadReportDeserializer = (key, content) -> jsonMapper()
.readValue(content, LocalBrokerData.class);

/**
* Initializes fields which do not depend on PulsarService. initialize(PulsarService) should subsequently be called.
Expand Down Expand Up @@ -579,6 +585,11 @@ public void writeBrokerDataOnZooKeeper() {
}
}

@Override
public Deserializer<LocalBrokerData> getLoadReportDeserializer() {
return loadReportDeserializer;
}

/**
* As the leader broker, write bundle data aggregated from all brokers to ZooKeeper.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.yahoo.pulsar.broker.stats.Metrics;
import com.yahoo.pulsar.common.naming.ServiceUnitId;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.common.policies.data.loadbalancer.ServiceLookupData;
import com.yahoo.pulsar.zookeeper.ZooKeeperCache.Deserializer;

/**
* Wrapper class allowing classes of instance ModularLoadManager to be compatible with the interface LoadManager.
Expand Down Expand Up @@ -103,4 +105,9 @@ public void writeLoadReportOnZookeeper() {
public void writeResourceQuotasToZooKeeper() {
loadManager.writeBundleDataOnZooKeeper();
}

@Override
public Deserializer<? extends ServiceLookupData> getLoadReportDeserializer() {
return loadManager.getLoadReportDeserializer();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,11 @@
import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage;
import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage.ResourceType;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;
import com.yahoo.pulsar.zookeeper.ZooKeeperCache.Deserializer;
import com.yahoo.pulsar.zookeeper.ZooKeeperCacheListener;
import com.yahoo.pulsar.zookeeper.ZooKeeperChildrenCache;
import com.yahoo.pulsar.zookeeper.ZooKeeperDataCache;
import static com.yahoo.pulsar.broker.admin.AdminResource.jsonMapper;

public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListener<LoadReport> {

Expand Down Expand Up @@ -174,6 +176,8 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
private long lastResourceUsageTimestamp = -1;
// flag to force update load report
private boolean forceLoadReportUpdate = false;
private static final Deserializer<LoadReport> loadReportDeserializer = (key, content) -> jsonMapper()
.readValue(content, LoadReport.class);

// Perform initializations which may be done without a PulsarService.
public SimpleLoadManagerImpl() {
Expand Down Expand Up @@ -313,6 +317,11 @@ public void disableBroker() throws Exception {
}
}

@Override
public Deserializer<LoadReport> getLoadReportDeserializer() {
return loadReportDeserializer;
}

public ZooKeeperChildrenCache getActiveBrokersCache() {
return this.availableActiveBrokers;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,6 @@ private NamespaceBundle getFullBundle(NamespaceName fqnn) throws Exception {
return bundleFactory.getFullBundle(fqnn);
}

private static final Deserializer<ServiceLookupData> serviceLookupDataDeserializer = (key, content) ->
jsonMapper().readValue(content, ServiceLookupData.class);


public URL getWebServiceUrl(ServiceUnitId suName, boolean authoritative, boolean isRequestHttps, boolean readOnly)
throws Exception {
if (suName instanceof DestinationName) {
Expand Down Expand Up @@ -399,15 +395,15 @@ private void searchForCandidateBroker(NamespaceBundle bundle, CompletableFuture<
}
}

private CompletableFuture<LookupResult> createLookupResult(String candidateBroker) throws Exception {
protected CompletableFuture<LookupResult> createLookupResult(String candidateBroker) throws Exception {

CompletableFuture<LookupResult> lookupFuture = new CompletableFuture<>();
try {
checkArgument(StringUtils.isNotBlank(candidateBroker), "Lookup broker can't be null " + candidateBroker);
URI uri = new URI(candidateBroker);
String path = String.format("%s/%s:%s", LoadManager.LOADBALANCE_BROKERS_ROOT, uri.getHost(),
uri.getPort());
pulsar.getLocalZkCache().getDataAsync(path, serviceLookupDataDeserializer).thenAccept(reportData -> {
pulsar.getLocalZkCache().getDataAsync(path, pulsar.getLoadManager().get().getLoadReportDeserializer()).thenAccept(reportData -> {
if (reportData.isPresent()) {
ServiceLookupData lookupData = reportData.get();
lookupFuture.complete(new LookupResult(lookupData.getWebServiceUrl(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,19 @@

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.URI;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
Expand All @@ -49,9 +54,18 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
import com.yahoo.pulsar.broker.LocalBrokerData;
import com.yahoo.pulsar.broker.PulsarServerException;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.loadbalance.LoadManager;
import com.yahoo.pulsar.broker.loadbalance.ModularLoadManager;
import com.yahoo.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import com.yahoo.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import com.yahoo.pulsar.broker.lookup.LookupResult;
import com.yahoo.pulsar.broker.service.BrokerTestBase;
import com.yahoo.pulsar.broker.service.Topic;
import com.yahoo.pulsar.broker.service.persistent.PersistentTopic;
Expand All @@ -62,9 +76,13 @@
import com.yahoo.pulsar.common.naming.NamespaceBundleFactory;
import com.yahoo.pulsar.common.naming.NamespaceBundles;
import com.yahoo.pulsar.common.naming.NamespaceName;
import com.yahoo.pulsar.common.naming.ServiceUnitId;
import com.yahoo.pulsar.common.policies.data.Policies;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.common.policies.data.loadbalancer.ServiceLookupData;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;
import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashMap;
import com.yahoo.pulsar.zookeeper.ZooKeeperCache.Deserializer;

public class NamespaceServiceTest extends BrokerTestBase {

Expand Down Expand Up @@ -279,7 +297,43 @@ public CompletableFuture<Void> answer(InvocationOnMock invocation) throws Throwa
// ok
}
}


/**
* <pre>
* It verifies that namespace service deserialize the load-report based on load-manager which active.
* 1. write candidate1- load report using {@link LoadReport} which is used by SimpleLoadManagerImpl
* 2. Write candidate2- load report using {@link LocalBrokerData} which is used by ModularLoadManagerImpl
* 3. try to get Lookup Result based on active load-manager
* </pre>
* @throws Exception
*/
@Test
public void testLoadReportDeserialize() throws Exception {

final String candidateBroker1 = "http://localhost:8000";
final String candidateBroker2 = "http://localhost:3000";
LoadReport lr = new LoadReport(null, null, candidateBroker1, null);
LocalBrokerData ld = new LocalBrokerData(null, null, candidateBroker2, null);
URI uri1 = new URI(candidateBroker1);
URI uri2 = new URI(candidateBroker2);
String path1 = String.format("%s/%s:%s", LoadManager.LOADBALANCE_BROKERS_ROOT, uri1.getHost(), uri1.getPort());
String path2 = String.format("%s/%s:%s", LoadManager.LOADBALANCE_BROKERS_ROOT, uri2.getHost(), uri2.getPort());
ZkUtils.createFullPathOptimistic(pulsar.getZkClient(), path1,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(lr), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
ZkUtils.createFullPathOptimistic(pulsar.getZkClient(), path2,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(ld), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
LookupResult result1 = pulsar.getNamespaceService().createLookupResult(candidateBroker1).get();

// update to new load mananger
pulsar.getLoadManager().set(new ModularLoadManagerWrapper(new ModularLoadManagerImpl()));
LookupResult result2 = pulsar.getNamespaceService().createLookupResult(candidateBroker2).get();
Assert.assertEquals(result1.getLookupData().getBrokerUrl(), candidateBroker1);
Assert.assertEquals(result2.getLookupData().getBrokerUrl(), candidateBroker2);
System.out.println(result2);
}

@SuppressWarnings("unchecked")
private Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles(NamespaceBundleFactory utilityFactory,
NamespaceName nsname, NamespaceBundles bundles, NamespaceBundle targetBundle) throws Exception {
Expand Down

0 comments on commit 911a9b6

Please sign in to comment.