Skip to content

Commit

Permalink
ISSUE apache#1495: Option to enforce minNumRacksPerWriteQuorum
Browse files Browse the repository at this point in the history
Provide an option to enforce the guarantee of minNumRacksPerWriteQuorum
if it is enabled. If it cann't find a bookie to enforce the guarantee
then the API in RackawareEnsemblePlacementPolicy should throw
BKNotEnoughBookiesException.
  • Loading branch information
reddycharan committed Jun 19, 2018
1 parent c7aa989 commit 488b202
Show file tree
Hide file tree
Showing 6 changed files with 711 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
import java.util.ArrayList;
import java.util.Set;

import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy.Ensemble;
import org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy.Predicate;
import org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.BookieNode;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
import org.apache.bookkeeper.net.BookieSocketAddress;
Expand Down Expand Up @@ -109,15 +113,63 @@ ArrayList<BookieSocketAddress> newEnsemble(
* predicate to apply
* @param ensemble
* ensemble
* @param fallbackToRandom
* fallbackToRandom
* @return the selected bookie.
* @throws BKException.BKNotEnoughBookiesException
*/
T selectFromNetworkLocation(String networkLoc,
Set<Node> excludeBookies,
Predicate<T> predicate,
Ensemble<T> ensemble)
Ensemble<T> ensemble,
boolean fallbackToRandom)
throws BKException.BKNotEnoughBookiesException;

/**
* Select a node from cluster excluding excludeBookies and bookie nodes of
* excludeRacks. If there isn't a BookieNode excluding those racks and
* nodes, then if fallbackToRandom is set to true then pick a random node
* from cluster just excluding excludeBookies.
*
* @param excludeRacks
* @param excludeBookies
* @param predicate
* @param ensemble
* @param fallbackToRandom
* @return
* @throws BKException.BKNotEnoughBookiesException
*/
T selectFromNetworkLocation(Set<String> excludeRacks,
Set<Node> excludeBookies,
Predicate<BookieNode> predicate,
Ensemble<BookieNode> ensemble,
boolean fallbackToRandom)
throws BKException.BKNotEnoughBookiesException;

/**
* Select a node from networkLoc rack excluding excludeBookies. If there
* isn't any node in 'networkLoc', then it will try to get a node from
* cluster excluding excludeRacks and excludeBookies. If fallbackToRandom is
* set to true then it will get a random bookie from cluster excluding
* excludeBookies if it couldn't find a bookie
*
* @param networkLoc
* @param excludeRacks
* @param excludeBookies
* @param predicate
* @param ensemble
* @param fallbackToRandom
* @return
* @throws BKNotEnoughBookiesException
*/
T selectFromNetworkLocation(String networkLoc,
Set<String> excludeRacks,
Set<Node> excludeBookies,
Predicate<BookieNode> predicate,
Ensemble<BookieNode> ensemble,
boolean fallbackToRandom)
throws BKNotEnoughBookiesException;

/**
* Handle bookies that left.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
import java.util.Map;
import java.util.Set;

import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy.Ensemble;
import org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy.Predicate;
import org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.BookieNode;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.net.Node;
Expand Down Expand Up @@ -54,18 +58,19 @@ protected RackawareEnsemblePlacementPolicy initialize(DNSToSwitchMapping dnsReso
boolean isWeighted,
int maxWeightMultiple,
int minNumRacksPerWriteQuorum,
StatsLogger statsLogger) {
boolean enforceMinNumRacksPerWriteQuorum,
StatsLogger statsLogger) {
if (stabilizePeriodSeconds > 0) {
super.initialize(dnsResolver, timer, reorderReadsRandom, 0, reorderThresholdPendingRequests, isWeighted,
maxWeightMultiple, minNumRacksPerWriteQuorum, statsLogger);
maxWeightMultiple, minNumRacksPerWriteQuorum, enforceMinNumRacksPerWriteQuorum, statsLogger);
slave = new RackawareEnsemblePlacementPolicyImpl(enforceDurability);
slave.initialize(dnsResolver, timer, reorderReadsRandom, stabilizePeriodSeconds,
reorderThresholdPendingRequests, isWeighted, maxWeightMultiple,
minNumRacksPerWriteQuorum, statsLogger);
reorderThresholdPendingRequests, isWeighted, maxWeightMultiple, minNumRacksPerWriteQuorum,
enforceMinNumRacksPerWriteQuorum, statsLogger);
} else {
super.initialize(dnsResolver, timer, reorderReadsRandom, stabilizePeriodSeconds,
reorderThresholdPendingRequests, isWeighted, maxWeightMultiple,
minNumRacksPerWriteQuorum, statsLogger);
reorderThresholdPendingRequests, isWeighted, maxWeightMultiple, minNumRacksPerWriteQuorum,
enforceMinNumRacksPerWriteQuorum, statsLogger);
slave = null;
}
return this;
Expand Down Expand Up @@ -171,15 +176,60 @@ public BookieNode selectFromNetworkLocation(
String networkLoc,
Set<Node> excludeBookies,
Predicate<BookieNode> predicate,
Ensemble<BookieNode> ensemble)
Ensemble<BookieNode> ensemble,
boolean fallbackToRandom)
throws BKException.BKNotEnoughBookiesException {
try {
return super.selectFromNetworkLocation(networkLoc, excludeBookies, predicate, ensemble);
return super.selectFromNetworkLocation(networkLoc, excludeBookies, predicate, ensemble,
fallbackToRandom);
} catch (BKException.BKNotEnoughBookiesException bnebe) {
if (slave == null) {
throw bnebe;
} else {
return slave.selectFromNetworkLocation(networkLoc, excludeBookies, predicate, ensemble);
return slave.selectFromNetworkLocation(networkLoc, excludeBookies, predicate, ensemble,
fallbackToRandom);
}
}
}

@Override
public BookieNode selectFromNetworkLocation(
Set<String> excludeRacks,
Set<Node> excludeBookies,
Predicate<BookieNode> predicate,
Ensemble<BookieNode> ensemble,
boolean fallbackToRandom)
throws BKException.BKNotEnoughBookiesException {
try {
return super.selectFromNetworkLocation(excludeRacks, excludeBookies, predicate, ensemble, fallbackToRandom);
} catch (BKException.BKNotEnoughBookiesException bnebe) {
if (slave == null) {
throw bnebe;
} else {
return slave.selectFromNetworkLocation(excludeRacks, excludeBookies, predicate, ensemble,
fallbackToRandom);
}
}
}

@Override
public BookieNode selectFromNetworkLocation(
String networkLoc,
Set<String> excludeRacks,
Set<Node> excludeBookies,
Predicate<BookieNode> predicate,
Ensemble<BookieNode> ensemble,
boolean fallbackToRandom)
throws BKNotEnoughBookiesException {
try {
return super.selectFromNetworkLocation(networkLoc, excludeRacks, excludeBookies, predicate, ensemble,
fallbackToRandom);
} catch (BKException.BKNotEnoughBookiesException bnebe) {
if (slave == null) {
throw bnebe;
} else {
return slave.selectFromNetworkLocation(networkLoc, excludeRacks, excludeBookies, predicate, ensemble,
fallbackToRandom);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
int maxWeightMultiple;
private Map<BookieNode, WeightedObject> bookieInfoMap = new HashMap<BookieNode, WeightedObject>();
private WeightedRandomSelection<BookieNode> weightedSelection;

protected int minNumRacksPerWriteQuorum;
protected boolean enforceMinNumRacksPerWriteQuorum;

public static final String REPP_DNS_RESOLVER_CLASS = "reppDnsResolverClass";
public static final String REPP_RANDOM_READ_REORDERING = "ensembleRandomReadReordering";
Expand Down Expand Up @@ -238,6 +240,7 @@ protected RackawareEnsemblePlacementPolicyImpl initialize(DNSToSwitchMapping dns
boolean isWeighted,
int maxWeightMultiple,
int minNumRacksPerWriteQuorum,
boolean enforceMinNumRacksPerWriteQuorum,
StatsLogger statsLogger) {
checkNotNull(statsLogger, "statsLogger should not be null, use NullStatsLogger instead.");
this.statsLogger = statsLogger;
Expand All @@ -250,6 +253,7 @@ protected RackawareEnsemblePlacementPolicyImpl initialize(DNSToSwitchMapping dns
this.dnsResolver = new DNSResolverDecorator(dnsResolver, () -> this.getDefaultRack());
this.timer = timer;
this.minNumRacksPerWriteQuorum = minNumRacksPerWriteQuorum;
this.enforceMinNumRacksPerWriteQuorum = enforceMinNumRacksPerWriteQuorum;

// create the network topology
if (stabilizePeriodSeconds > 0) {
Expand Down Expand Up @@ -339,6 +343,7 @@ public Long load(BookieSocketAddress key) throws Exception {
conf.getDiskWeightBasedPlacementEnabled(),
conf.getBookieMaxWeightMultipleForWeightBasedPlacement(),
conf.getMinNumRacksPerWriteQuorum(),
conf.getEnforceMinNumRacksPerWriteQuorum(),
statsLogger);
}

Expand Down Expand Up @@ -536,6 +541,10 @@ protected ArrayList<BookieSocketAddress> newEnsembleInternal(
int numRacks = topology.getNumOfRacks();
// only one rack, use the random algorithm.
if (numRacks < 2) {
if (enforceMinNumRacksPerWriteQuorum && (minNumRacksPerWriteQuorumForThisEnsemble > 1)) {
LOG.error("Only one rack available and minNumRacksPerWriteQuorum is enforced, so giving up");
throw new BKNotEnoughBookiesException();
}
List<BookieNode> bns = selectRandom(ensembleSize, excludeNodes, TruePredicate.INSTANCE,
ensemble);
ArrayList<BookieSocketAddress> addrs = new ArrayList<BookieSocketAddress>(ensembleSize);
Expand Down Expand Up @@ -610,7 +619,9 @@ protected ArrayList<BookieSocketAddress> newEnsembleInternal(
}
curRack = sb.toString();
}
prevNode = selectFromNetworkLocation(curRack, excludeNodes, ensemble, ensemble);
boolean firstBookieInTheEnsemble = (null == prevNode);
prevNode = selectFromNetworkLocation(curRack, excludeNodes, ensemble, ensemble,
!enforceMinNumRacksPerWriteQuorum || firstBookieInTheEnsemble);
racks[i] = prevNode.getNetworkLocation();
}
ArrayList<BookieSocketAddress> bookieList = ensemble.toList();
Expand Down Expand Up @@ -657,7 +668,8 @@ public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize,
networkLocationsToBeExcluded,
excludeNodes,
TruePredicate.INSTANCE,
EnsembleForReplacementWithNoConstraints.INSTANCE);
EnsembleForReplacementWithNoConstraints.INSTANCE,
!enforceMinNumRacksPerWriteQuorum);
if (LOG.isDebugEnabled()) {
LOG.debug("Bookie {} is chosen to replace bookie {}.", candidate, bn);
}
Expand Down Expand Up @@ -698,12 +710,20 @@ public BookieNode selectFromNetworkLocation(
String networkLoc,
Set<Node> excludeBookies,
Predicate<BookieNode> predicate,
Ensemble<BookieNode> ensemble)
Ensemble<BookieNode> ensemble,
boolean fallbackToRandom)
throws BKNotEnoughBookiesException {
// select one from local rack
try {
return selectRandomFromRack(networkLoc, excludeBookies, predicate, ensemble);
} catch (BKNotEnoughBookiesException e) {
if (!fallbackToRandom) {
LOG.error(
"Failed to choose a bookie from {} : "
+ "excluded {}, enforceMinNumRacksPerWriteQuorum is enabled so giving up.",
networkLoc, excludeBookies);
throw e;
}
LOG.warn("Failed to choose a bookie from {} : "
+ "excluded {}, fallback to choose bookie randomly from the cluster.",
networkLoc, excludeBookies);
Expand All @@ -712,28 +732,25 @@ public BookieNode selectFromNetworkLocation(
}
}

protected BookieNode selectFromNetworkLocation(String networkLoc,
@Override
public BookieNode selectFromNetworkLocation(String networkLoc,
Set<String> excludeRacks,
Set<Node> excludeBookies,
Predicate<BookieNode> predicate,
Ensemble<BookieNode> ensemble)
Ensemble<BookieNode> ensemble,
boolean fallbackToRandom)
throws BKNotEnoughBookiesException {
// first attempt to select one from local rack
try {
return selectRandomFromRack(networkLoc, excludeBookies, predicate, ensemble);
} catch (BKNotEnoughBookiesException e) {
if (isWeighted) {
// if weight based selection is enabled, randomly select one from the whole cluster
// based on weights and ignore the provided <tt>excludeRacks</tt>.
// randomly choose one from whole cluster, ignore the provided predicate.
return selectRandom(1, excludeBookies, predicate, ensemble).get(0);
} else {
// if weight based selection is disabled, and there is no enough bookie from local rack,
// select bookies from the whole cluster and exclude the racks specified at <tt>excludeRacks</tt>.
return selectFromNetworkLocation(excludeRacks, excludeBookies, predicate, ensemble);
}
/*
* there is no enough bookie from local rack, select bookies from
* the whole cluster and exclude the racks specified at
* <tt>excludeRacks</tt>.
*/
return selectFromNetworkLocation(excludeRacks, excludeBookies, predicate, ensemble, fallbackToRandom);
}

}


Expand All @@ -742,27 +759,39 @@ protected BookieNode selectFromNetworkLocation(String networkLoc,
* <i>excludeBookies</i> set. If it fails to find one, it selects a random {@link BookieNode} from the whole
* cluster.
*/
protected BookieNode selectFromNetworkLocation(Set<String> excludeRacks,
@Override
public BookieNode selectFromNetworkLocation(Set<String> excludeRacks,
Set<Node> excludeBookies,
Predicate<BookieNode> predicate,
Ensemble<BookieNode> ensemble)
Ensemble<BookieNode> ensemble,
boolean fallbackToRandom)
throws BKNotEnoughBookiesException {
List<BookieNode> knownNodes = new ArrayList<>(knownBookies.values());
Collections.shuffle(knownNodes);

List<BookieNode> knownNodes = new ArrayList<>(knownBookies.values());
Set<Node> fullExclusionBookiesList = new HashSet<Node>(excludeBookies);
for (BookieNode knownNode : knownNodes) {
if (excludeBookies.contains(knownNode)) {
continue;
}
if (excludeRacks.contains(knownNode.getNetworkLocation())) {
continue;
fullExclusionBookiesList.add(knownNode);
}
}

try {
return selectRandomInternal(knownNodes, 1, fullExclusionBookiesList, TruePredicate.INSTANCE,
EnsembleForReplacementWithNoConstraints.INSTANCE).get(0);
} catch (BKNotEnoughBookiesException e) {
if (!fallbackToRandom) {
LOG.error(
"Failed to choose a bookie excluding Racks: {} "
+ "Nodes: {}, enforceMinNumRacksPerWriteQuorum is enabled so giving up.",
excludeRacks, excludeBookies);
throw e;
}
return knownNode;

LOG.warn("Failed to choose a bookie: excluded {}, fallback to choose bookie randomly from the cluster.",
excludeBookies);
// randomly choose one from whole cluster
return selectRandom(1, excludeBookies, predicate, ensemble).get(0);
}
LOG.warn("Failed to choose a bookie: excluded {}, fallback to choose bookie randomly from the cluster.",
excludeBookies);
// randomly choose one from whole cluster
return selectRandom(1, excludeBookies, predicate, ensemble).get(0);
}

private WeightedRandomSelection<BookieNode> prepareForWeightedSelection(List<Node> leaves) {
Expand Down
Loading

0 comments on commit 488b202

Please sign in to comment.