Skip to content

Commit

Permalink
BookKeeper client should try not to use bookies with errors/timeouts …
Browse files Browse the repository at this point in the history
…when forming a new ensemble

…hen forming ensembles

Author: Siddharth Boobna <sboobna@yahoo-inc.com>

Reviewers: Sijie Guo <sijie@apache.org>

Closes #11 from sboobna/BOOKKEEPER-889
  • Loading branch information
Siddharth Boobna authored and sijie committed Mar 8, 2016
1 parent 96adbf1 commit c8255f8
Show file tree
Hide file tree
Showing 12 changed files with 497 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ public static BKException create(int code) {
return new BKAddEntryQuorumTimeoutException();
case Code.DuplicateEntryIdException:
return new BKDuplicateEntryIdException();
case Code.TimeoutException:
return new BKTimeoutException();
default:
return new BKUnexpectedConditionException();
}
Expand Down Expand Up @@ -131,6 +133,7 @@ public interface Code {
int LedgerExistException = -20;
int AddEntryQuorumTimeoutException = -21;
int DuplicateEntryIdException = -22;
int TimeoutException = -23;

int IllegalOpException = -100;
int LedgerFencedException = -101;
Expand Down Expand Up @@ -213,6 +216,8 @@ public static String getMessage(int code) {
return "Invalid operation";
case Code.AddEntryQuorumTimeoutException:
return "Add entry quorum wait timed out";
case Code.TimeoutException:
return "Bookie operation timeout";
default:
return "Unexpected condition";
}
Expand Down Expand Up @@ -392,4 +397,9 @@ public BKClientClosedException() {
}
}

public static class BKTimeoutException extends BKException {
public BKTimeoutException() {
super(Code.TimeoutException);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
package org.apache.bookkeeper.client;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import com.google.common.base.Preconditions;

import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
Expand All @@ -37,13 +39,15 @@
import org.apache.bookkeeper.meta.LedgerIdGenerator;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.bookkeeper.util.ReflectionUtils;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.commons.configuration.ConfigurationException;
Expand Down Expand Up @@ -312,6 +316,8 @@ private BookKeeper(ClientConfiguration conf,
this.ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, this.zk);
this.ledgerManager = new CleanupLedgerManager(ledgerManagerFactory.newLedgerManager());
this.ledgerIdGenerator = ledgerManagerFactory.newLedgerIdGenerator();

scheduleBookieHealthCheckIfEnabled();
}

private EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfiguration conf)
Expand All @@ -336,6 +342,26 @@ int getReturnRc(int rc) {
}
}

void scheduleBookieHealthCheckIfEnabled() {
if (conf.isBookieHealthCheckEnabled()) {
scheduler.scheduleAtFixedRate(new SafeRunnable() {

@Override
public void safeRun() {
checkForFaultyBookies();
}
}, conf.getBookieHealthCheckIntervalSeconds(), conf.getBookieHealthCheckIntervalSeconds(),
TimeUnit.SECONDS);
}
}

void checkForFaultyBookies() {
List<BookieSocketAddress> faultyBookies = bookieClient.getFaultyBookies();
for (BookieSocketAddress faultyBookie : faultyBookies) {
bookieWatcher.quarantineBookie(faultyBookie);
}
}

LedgerManager getLedgerManager() {
return ledgerManager;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;

/**
* This class is responsible for maintaining a consistent view of what bookies
* are available by reading Zookeeper (and setting watches on the bookie nodes).
Expand All @@ -57,6 +62,7 @@ class BookieWatcher implements Watcher, ChildrenCallback {

public static int ZK_CONNECT_BACKOFF_SEC = 1;
private static final Set<BookieSocketAddress> EMPTY_SET = new HashSet<BookieSocketAddress>();
private static final Boolean BOOLEAN = new Boolean(true);

// Bookie registration path in ZK
private final String bookieRegistrationPath;
Expand All @@ -65,6 +71,9 @@ class BookieWatcher implements Watcher, ChildrenCallback {
final ScheduledExecutorService scheduler;
final EnsemblePlacementPolicy placementPolicy;

// Bookies that will not be preferred to be chosen in a new ensemble
final Cache<BookieSocketAddress, Boolean> quarantinedBookies;

SafeRunnable reReadTask = new SafeRunnable() {
@Override
public void safeRun() {
Expand All @@ -83,6 +92,16 @@ public BookieWatcher(ClientConfiguration conf,
this.scheduler = scheduler;
this.placementPolicy = placementPolicy;
readOnlyBookieWatcher = new ReadOnlyBookieWatcher(conf, bk);
this.quarantinedBookies = CacheBuilder.newBuilder()
.expireAfterWrite(conf.getBookieQuarantineTimeSeconds(), TimeUnit.SECONDS)
.removalListener(new RemovalListener<BookieSocketAddress, Boolean>() {

@Override
public void onRemoval(RemovalNotification<BookieSocketAddress, Boolean> bookie) {
logger.info("Bookie {} is no longer quarantined", bookie.getKey());
}

}).build();
}

void notifyBookiesChanged(final BookiesListener listener) throws BKException {
Expand Down Expand Up @@ -235,7 +254,16 @@ public void processResult(int rc, String path, Object ctx, List<String> children
*/
public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize)
throws BKNotEnoughBookiesException {
return placementPolicy.newEnsemble(ensembleSize, writeQuorumSize, EMPTY_SET);
try {
// we try to only get from the healthy bookies first
return placementPolicy.newEnsemble(ensembleSize, writeQuorumSize, new HashSet<BookieSocketAddress>(
quarantinedBookies.asMap().keySet()));
} catch (BKNotEnoughBookiesException e) {
if (logger.isDebugEnabled()) {
logger.debug("Not enough healthy bookies available, using quarantined bookies");
}
return placementPolicy.newEnsemble(ensembleSize, writeQuorumSize, EMPTY_SET);
}
}

/**
Expand All @@ -250,7 +278,29 @@ public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuo
public BookieSocketAddress replaceBookie(List<BookieSocketAddress> existingBookies, int bookieIdx)
throws BKNotEnoughBookiesException {
BookieSocketAddress addr = existingBookies.get(bookieIdx);
return placementPolicy.replaceBookie(addr, new HashSet<BookieSocketAddress>(existingBookies));
try {
// we exclude the quarantined bookies also first
Set<BookieSocketAddress> existingAndQuarantinedBookies = new HashSet<BookieSocketAddress>(existingBookies);
existingAndQuarantinedBookies.addAll(quarantinedBookies.asMap().keySet());
return placementPolicy.replaceBookie(addr, existingAndQuarantinedBookies);
} catch (BKNotEnoughBookiesException e) {
if (logger.isDebugEnabled()) {
logger.debug("Not enough healthy bookies available, using quarantined bookies");
}
return placementPolicy.replaceBookie(addr, new HashSet<BookieSocketAddress>(existingBookies));
}
}

/**
* Quarantine <i>bookie</i> so it will not be preferred to be chosen for new ensembles.
* @param bookie
* @return
*/
public void quarantineBookie(BookieSocketAddress bookie) {
if (quarantinedBookies.getIfPresent(bookie) == null) {
quarantinedBookies.put(bookie, BOOLEAN);
logger.warn("Bookie {} has been quarantined because of read/write errors.", bookie);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ public class ClientConfiguration extends AbstractConfiguration {
protected final static String PCBC_TIMEOUT_TIMER_TICK_DURATION_MS = "pcbcTimeoutTimerTickDurationMs";
protected final static String PCBC_TIMEOUT_TIMER_NUM_TICKS = "pcbcTimeoutTimerNumTicks";

// Bookie health check settings
protected final static String BOOKIE_HEALTH_CHECK_ENABLED = "bookieHealthCheckEnabled";
protected final static String BOOKIE_HEALTH_CHECK_INTERVAL_SECONDS = "bookieHealthCheckIntervalSeconds";
protected final static String BOOKIE_ERROR_THRESHOLD_PER_INTERVAL = "bookieErrorThresholdPerInterval";
protected final static String BOOKIE_QUARANTINE_TIME_SECONDS = "bookieQuarantineTimeSeconds";

// Number Woker Threads
protected final static String NUM_WORKER_THREADS = "numWorkerThreads";

Expand Down Expand Up @@ -691,4 +697,107 @@ public ClientConfiguration setTaskExecutionWarnTimeMicros(long warnTime) {
setProperty(TASK_EXECUTION_WARN_TIME_MICROS, warnTime);
return this;
}

/**
* Check if bookie health check is enabled.
*
* @return
*/
public boolean isBookieHealthCheckEnabled() {
return getBoolean(BOOKIE_HEALTH_CHECK_ENABLED, false);
}

/**
* Enables the bookie health check.
*
* <p>
* If the number of read/write errors for a bookie exceeds {@link #getBookieErrorThresholdPerInterval()} per
* interval, that bookie is quarantined for {@link #getBookieQuarantineTimeSeconds()} seconds. During this
* quarantined period, the client will try not to use this bookie when creating new ensembles.
* </p>
*
* By default, the bookie health check is <b>disabled</b>.
*
* @return client configuration
*/
public ClientConfiguration enableBookieHealthCheck() {
setProperty(BOOKIE_HEALTH_CHECK_ENABLED, true);
return this;
}

/**
* Get the bookie health check interval in seconds.
*
* @return
*/
public int getBookieHealthCheckIntervalSeconds() {
return getInt(BOOKIE_HEALTH_CHECK_INTERVAL_SECONDS, 60);
}

/**
* Set the bookie health check interval. Default is 60 seconds.
*
* <p>
* Note: Please {@link #enableBookieHealthCheck()} to use this configuration.
* </p>
*
* @param interval
* @param unit
* @return client configuration
*/
public ClientConfiguration setBookieHealthCheckInterval(int interval, TimeUnit unit) {
setProperty(BOOKIE_HEALTH_CHECK_INTERVAL_SECONDS, unit.toSeconds(interval));
return this;
}

/**
* Get the error threshold for a bookie to be quarantined.
*
* @return
*/
public long getBookieErrorThresholdPerInterval() {
return getLong(BOOKIE_ERROR_THRESHOLD_PER_INTERVAL, 100);
}

/**
* Set the error threshold per interval ({@link #getBookieHealthCheckIntervalSeconds()}) for a bookie before it is
* quarantined. Default is 100 errors per minute.
*
* <p>
* Note: Please {@link #enableBookieHealthCheck()} to use this configuration.
* </p>
*
* @param threshold
* @param unit
* @return client configuration
*/
public ClientConfiguration setBookieErrorThresholdPerInterval(long thresholdPerInterval) {
setProperty(BOOKIE_ERROR_THRESHOLD_PER_INTERVAL, thresholdPerInterval);
return this;
}

/**
* Get the time for which a bookie will be quarantined.
*
* @return
*/
public int getBookieQuarantineTimeSeconds() {
return getInt(BOOKIE_QUARANTINE_TIME_SECONDS, 1800);
}

/**
* Set the time for which a bookie will be quarantined. Default is 30 minutes.
*
* <p>
* Note: Please {@link #enableBookieHealthCheck()} to use this configuration.
* </p>
*
* @param quarantineTime
* @param unit
* @return client configuration
*/
public ClientConfiguration setBookieQuarantineTime(int quarantineTime, TimeUnit unit) {
setProperty(BOOKIE_QUARANTINE_TIME_SECONDS, unit.toSeconds(quarantineTime));
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,13 @@
import static com.google.common.base.Charsets.UTF_8;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
Expand All @@ -52,6 +50,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
* Implements the client-side part of the BookKeeper protocol.
*
Expand All @@ -70,6 +71,8 @@ public class BookieClient implements PerChannelBookieClientFactory {
private final StatsLogger statsLogger;
private final int numConnectionsPerBookie;

private final long bookieErrorThresholdPerInterval;

public BookieClient(ClientConfiguration conf, ClientSocketChannelFactory channelFactory, OrderedSafeExecutor executor) {
this(conf, channelFactory, executor, NullStatsLogger.INSTANCE);
}
Expand All @@ -87,6 +90,7 @@ public BookieClient(ClientConfiguration conf, ClientSocketChannelFactory channel
new ThreadFactoryBuilder().setNameFormat("BookieClientTimer-%d").build(),
conf.getPCBCTimeoutTimerTickDurationMs(), TimeUnit.MILLISECONDS,
conf.getPCBCTimeoutTimerNumTicks());
this.bookieErrorThresholdPerInterval = conf.getBookieErrorThresholdPerInterval();
}

private int getRc(int rc) {
Expand All @@ -101,10 +105,23 @@ private int getRc(int rc) {
}
}

public List<BookieSocketAddress> getFaultyBookies() {
List<BookieSocketAddress> faultyBookies = Lists.newArrayList();
for (PerChannelBookieClientPool channelPool : channels.values()) {
if (channelPool instanceof DefaultPerChannelBookieClientPool) {
DefaultPerChannelBookieClientPool pool = (DefaultPerChannelBookieClientPool) channelPool;
if (pool.errorCounter.getAndSet(0) >= bookieErrorThresholdPerInterval) {
faultyBookies.add(pool.address);
}
}
}
return faultyBookies;
}

@Override
public PerChannelBookieClient create(BookieSocketAddress address) {
public PerChannelBookieClient create(BookieSocketAddress address, PerChannelBookieClientPool pcbcPool) {
return new PerChannelBookieClient(conf, executor, channelFactory, address,
requestTimer, statsLogger);
requestTimer, statsLogger, pcbcPool);
}

private PerChannelBookieClientPool lookupClient(BookieSocketAddress addr, Object key) {
Expand Down
Loading

0 comments on commit c8255f8

Please sign in to comment.