Skip to content

Commit

Permalink
Remove underreplicaiton callback (#4058)
Browse files Browse the repository at this point in the history
### Motivation

Address https://lists.apache.org/thread/1xl3hr2cpyd5xh9kozbx5xlfsjsg3f4h
Deprecate underreplicaiton callback to relieve zookeeper pressure.
  • Loading branch information
horizonzy authored Aug 29, 2023
1 parent 69f27b7 commit 84a80f5
Show file tree
Hide file tree
Showing 4 changed files with 3 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,9 @@ boolean initializeLostBookieRecoveryDelay(int lostBookieRecoveryDelay)
* @param cb
* @throws ReplicationException.UnavailableException
*/
void notifyUnderReplicationLedgerChanged(GenericCallback<Void> cb)
throws ReplicationException.UnavailableException;
@Deprecated
default void notifyUnderReplicationLedgerChanged(GenericCallback<Void> cb)
throws ReplicationException.UnavailableException {}

/**
* Receive notification asynchronously when the lostBookieRecoveryDelay value is Changed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,12 @@
package org.apache.bookkeeper.replication;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.SettableFuture;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
Expand All @@ -46,7 +44,6 @@
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.meta.UnderreplicatedLedger;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.replication.ReplicationException.BKAuditException;
Expand Down Expand Up @@ -183,7 +180,6 @@ public Auditor(final String bookieIdentifier,
conf, auditorStats, admin, ledgerManager,
ledgerUnderreplicationManager, shutdownTaskHandler, hasAuditCheckTask);
allAuditorTasks.add(auditorReplicasCheckTask);

executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Expand Down Expand Up @@ -393,8 +389,6 @@ public void start() {
knownBookies = getAvailableBookies();
this.ledgerUnderreplicationManager
.notifyLostBookieRecoveryDelayChanged(new LostBookieRecoveryDelayChangedCb());
this.ledgerUnderreplicationManager.notifyUnderReplicationLedgerChanged(
new UnderReplicatedLedgersChangedCb());
} catch (BKException bke) {
LOG.error("Couldn't get bookie list, so exiting", bke);
submitShutdownTask();
Expand All @@ -404,7 +398,6 @@ public void start() {
submitShutdownTask();
return;
}

scheduleBookieCheckTask();
scheduleCheckAllLedgersTask();
schedulePlacementPolicyCheckTask();
Expand Down Expand Up @@ -558,16 +551,6 @@ private void scheduleReplicasCheckTask() {
executor.scheduleAtFixedRate(auditorReplicasCheckTask, initialDelay, interval, TimeUnit.SECONDS);
}

private class UnderReplicatedLedgersChangedCb implements GenericCallback<Void> {
@Override
public void operationComplete(int rc, Void result) {
Iterator<UnderreplicatedLedger> underreplicatedLedgersInfo = ledgerUnderreplicationManager
.listLedgersToRereplicate(null);
auditorStats.getUnderReplicatedLedgersGuageValue().set(Iterators.size(underreplicatedLedgersInfo));
auditorStats.getNumReplicatedLedgers().inc();
}
}

private class LostBookieRecoveryDelayChangedCb implements GenericCallback<Void> {
@Override
public void operationComplete(int rc, Void result) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,9 @@
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_LEDGERS_HAVING_NO_REPLICA_OF_AN_ENTRY;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_LEDGERS_SOFTLY_ADHERING_TO_PLACEMENT_POLICY;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_REPLICATED_LEDGERS;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_SKIPPING_CHECK_TASK_TIMES;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_UNDERREPLICATED_LEDGERS_ELAPSED_RECOVERY_GRACE_PERIOD;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS_GUAGE;
import static org.apache.bookkeeper.replication.ReplicationStats.PLACEMENT_POLICY_CHECK_TIME;
import static org.apache.bookkeeper.replication.ReplicationStats.REPLICAS_CHECK_TIME;
import static org.apache.bookkeeper.replication.ReplicationStats.UNDER_REPLICATED_LEDGERS_TOTAL_SIZE;
Expand Down Expand Up @@ -130,11 +128,6 @@ public class AuditorStats {
help = "the number of delayed-bookie-audits cancelled"
)
private final Counter numDelayedBookieAuditsCancelled;
@StatsDoc(
name = NUM_REPLICATED_LEDGERS,
help = "the number of replicated ledgers"
)
private final Counter numReplicatedLedgers;
@StatsDoc(
name = NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY,
help = "Gauge for number of ledgers not adhering to placement policy found in placement policy check"
Expand Down Expand Up @@ -167,11 +160,6 @@ public class AuditorStats {
+ ", this doesn't include ledgers counted towards numLedgersHavingLessThanAQReplicasOfAnEntry"
)
private final Gauge<Integer> numLedgersHavingLessThanWQReplicasOfAnEntry;
@StatsDoc(
name = NUM_UNDER_REPLICATED_LEDGERS_GUAGE,
help = "Gauge for num of underreplicated ledgers"
)
private final Gauge<Integer> numUnderReplicatedLedgers;
@StatsDoc(
name = NUM_SKIPPING_CHECK_TASK_TIMES,
help = "the times of auditor check task skipped"
Expand Down Expand Up @@ -203,7 +191,6 @@ public AuditorStats(StatsLogger statsLogger) {
numBookieAuditsDelayed = this.statsLogger.getCounter(ReplicationStats.NUM_BOOKIE_AUDITS_DELAYED);
numDelayedBookieAuditsCancelled = this.statsLogger
.getCounter(ReplicationStats.NUM_DELAYED_BOOKIE_AUDITS_DELAYES_CANCELLED);
numReplicatedLedgers = this.statsLogger.getCounter(NUM_REPLICATED_LEDGERS);
numSkippingCheckTaskTimes = this.statsLogger.getCounter(NUM_SKIPPING_CHECK_TASK_TIMES);
numLedgersNotAdheringToPlacementPolicy = new Gauge<Integer>() {
@Override
Expand Down Expand Up @@ -285,17 +272,5 @@ public Integer getSample() {
};
this.statsLogger.registerGauge(ReplicationStats.NUM_LEDGERS_HAVING_LESS_THAN_WQ_REPLICAS_OF_AN_ENTRY,
numLedgersHavingLessThanWQReplicasOfAnEntry);
numUnderReplicatedLedgers = new Gauge<Integer>() {
@Override
public Integer getDefaultValue() {
return 0;
}

@Override
public Integer getSample() {
return underReplicatedLedgersGuageValue.get();
}
};
this.statsLogger.registerGauge(NUM_UNDER_REPLICATED_LEDGERS_GUAGE, numUnderReplicatedLedgers);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ public interface ReplicationStats {
String REPLICATE_EXCEPTION = "exceptions";
String NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER = "NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER";
String NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION = "NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION";
String NUM_UNDER_REPLICATED_LEDGERS_GUAGE = "NUM_UNDER_REPLICATED_LEDGERS_GUAGE";
String NUM_REPLICATED_LEDGERS = "NUM_REPLICATED_LEDGERS";
String NUM_NOT_ADHERING_PLACEMENT_LEDGERS_REPLICATED = "NUM_NOT_ADHERING_PLACEMENT_LEDGERS_REPLICATED";
String NUM_SKIPPING_CHECK_TASK_TIMES = "NUM_SKIPPING_CHECK_TASK_TIMES";
}

0 comments on commit 84a80f5

Please sign in to comment.