Skip to content

Commit

Permalink
fixed other bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
heesung-sn committed Feb 16, 2024
1 parent c94cc8c commit 2fbf000
Show file tree
Hide file tree
Showing 9 changed files with 206 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -943,6 +943,9 @@ public void start() throws PulsarServerException {
LOG.info("messaging service is ready, bootstrap_seconds={}, {}, cluster={}, configs={}",
bootstrapTimeSeconds, bootstrapMessage, config.getClusterName(), config);

// ExtensibleLoadManagerImpl configures system topics at the end.
ExtensibleLoadManagerImpl.configureSystemTopics(this);

state = State.Started;
} catch (Exception e) {
LOG.error("Failed to start Pulsar service: {}", e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static java.lang.String.format;
import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.Role.Follower;
import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.Role.Leader;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.TOPIC;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Success;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Admin;
import static org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.getNamespaceBundle;
Expand Down Expand Up @@ -118,6 +119,8 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {

private static final long MONITOR_INTERVAL_IN_MILLIS = 120_000;

public static final long COMPACTION_THRESHOLD = 5 * 1024 * 1024;

private static final String ELECTION_ROOT = "/loadbalance/extension/leader";

private PulsarService pulsar;
Expand Down Expand Up @@ -270,7 +273,7 @@ public static boolean isLoadManagerExtensionEnabled(ServiceConfiguration conf) {
}

public static boolean isLoadManagerExtensionEnabled(PulsarService pulsar) {
return pulsar.getLoadManager().get() instanceof ExtensibleLoadManagerImpl;
return pulsar.getLoadManager().get() instanceof ExtensibleLoadManagerWrapper;
}

public static ExtensibleLoadManagerImpl get(LoadManager loadManager) {
Expand Down Expand Up @@ -311,6 +314,20 @@ private static void createSystemTopics(PulsarService pulsar) throws PulsarServer
createSystemTopic(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
}

public static void configureSystemTopics(PulsarService pulsar) throws PulsarServerException, PulsarAdminException {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)
&& pulsar.getConfiguration().isSystemTopicAndTopicLevelPoliciesEnabled()) {
Long threshold = pulsar.getAdminClient().topicPolicies().getCompactionThreshold(TOPIC);
if (threshold == null || COMPACTION_THRESHOLD != threshold.longValue()) {
pulsar.getAdminClient().topicPolicies().setCompactionThreshold(TOPIC, COMPACTION_THRESHOLD);
log.info("Set compaction threshold: {} bytes for system topic {}.", COMPACTION_THRESHOLD, TOPIC);
}
} else {
log.warn("System topic or topic level policies is disabled. "
+ "{} compaction threshold follows the broker or namespace policies.", TOPIC);
}
}

/**
* Gets the assigned broker for the given topic.
* @param pulsar PulsarService instance
Expand Down Expand Up @@ -443,6 +460,7 @@ public void start() throws PulsarServerException {
this.splitScheduler.start();
this.initWaiter.countDown();
this.started = true;
log.info("Started load manager.");
} catch (Exception ex) {
if (this.brokerRegistry != null) {
brokerRegistry.close();
Expand Down Expand Up @@ -575,7 +593,7 @@ private CompletableFuture<Optional<BrokerLookupData>> dedupeLookupRequest(
if (ex != null) {
assignCounter.incrementFailure();
}
lookupRequests.remove(key, newFutureCreated.getValue());
lookupRequests.remove(key);
});
}
}
Expand Down Expand Up @@ -788,13 +806,13 @@ public void close() throws PulsarServerException {
}

public static boolean isInternalTopic(String topic) {
return topic.startsWith(ServiceUnitStateChannelImpl.TOPIC)
return topic.startsWith(TOPIC)
|| topic.startsWith(BROKER_LOAD_DATA_STORE_TOPIC)
|| topic.startsWith(TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
}

@VisibleForTesting
void playLeader() {
synchronized void playLeader() {
log.info("This broker:{} is setting the role from {} to {}",
pulsar.getBrokerId(), role, Leader);
int retry = 0;
Expand All @@ -812,7 +830,7 @@ void playLeader() {
serviceUnitStateChannel.scheduleOwnershipMonitor();
break;
} catch (Throwable e) {
log.error("The broker:{} failed to set the role. Retrying {} th ...",
log.warn("The broker:{} failed to set the role. Retrying {} th ...",
pulsar.getBrokerId(), ++retry, e);
try {
Thread.sleep(Math.min(retry * 10, MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS));
Expand All @@ -832,7 +850,7 @@ void playLeader() {
}

@VisibleForTesting
void playFollower() {
synchronized void playFollower() {
log.info("This broker:{} is setting the role from {} to {}",
pulsar.getBrokerId(), role, Follower);
int retry = 0;
Expand All @@ -846,7 +864,7 @@ void playFollower() {
topBundlesLoadDataStore.startProducer();
break;
} catch (Throwable e) {
log.error("The broker:{} failed to set the role. Retrying {} th ...",
log.warn("The broker:{} failed to set the role. Retrying {} th ...",
pulsar.getBrokerId(), ++retry, e);
try {
Thread.sleep(Math.min(retry * 10, MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -121,7 +122,6 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
private static final long MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS = 10;
private static final int MAX_OUTSTANDING_PUB_MESSAGES = 500;
private static final long MAX_OWNED_BUNDLE_COUNT_DELAY_TIME_IN_MILLIS = 10 * 60 * 1000;
private static final long COMPACTION_THRESHOLD = 5 * 1024 * 1024;
private final PulsarService pulsar;
private final ServiceConfiguration config;
private final Schema<ServiceUnitStateData> schema;
Expand Down Expand Up @@ -299,14 +299,6 @@ public synchronized void start() throws PulsarServerException {

ExtensibleLoadManagerImpl.createSystemTopic(pulsar, TOPIC);

if (config.isSystemTopicAndTopicLevelPoliciesEnabled()) {
Long threshold = pulsar.getAdminClient().topicPolicies().getCompactionThreshold(TOPIC);
if (threshold == null || COMPACTION_THRESHOLD != threshold) {
pulsar.getAdminClient().topicPolicies().setCompactionThreshold(TOPIC, COMPACTION_THRESHOLD);
log.info("Set compaction threshold for system topic {}.", TOPIC);
}
}

producer = pulsar.getClient().newProducer(schema)
.enableBatching(true)
.compressionType(MSG_COMPRESSION_TYPE)
Expand Down Expand Up @@ -489,18 +481,21 @@ private CompletableFuture<Optional<String>> getActiveOwnerAsync(
String serviceUnit,
ServiceUnitState state,
Optional<String> owner) {
CompletableFuture<Optional<String>> activeOwner = owner.isPresent()
? brokerRegistry.lookupAsync(owner.get()).thenApply(lookupData -> lookupData.flatMap(__ -> owner))
: CompletableFuture.completedFuture(Optional.empty());

return activeOwner
.thenCompose(broker -> broker
.map(__ -> activeOwner)
.orElseGet(() -> deferGetOwnerRequest(serviceUnit).thenApply(Optional::ofNullable)))

return deferGetOwnerRequest(serviceUnit)
.thenCompose(newOwner -> brokerRegistry.lookupAsync(newOwner)
.thenApply(lookupData -> {
if (lookupData.isPresent()) {
return Optional.of(newOwner);
} else {
throw new IllegalStateException(
"The new owner " + newOwner + " is inactive.");
}
}))
.whenComplete((__, e) -> {
if (e != null) {
log.error("Failed to get active owner broker. serviceUnit:{}, state:{}, owner:{}",
serviceUnit, state, owner, e);
log.error("{} failed to get active owner broker. serviceUnit:{}, state:{}, owner:{}",
brokerId, serviceUnit, state, owner, e);
ownerLookUpCounters.get(state).getFailure().incrementAndGet();
}
});
Expand Down Expand Up @@ -541,6 +536,22 @@ public CompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit) {
}
}

private Optional<String> getOwner(String serviceUnit) {
ServiceUnitStateData data = tableview.get(serviceUnit);
ServiceUnitState state = state(data);
switch (state) {
case Owned -> {
return Optional.of(data.dstBroker());
}
case Splitting -> {
return Optional.of(data.sourceBroker());
}
default -> {
return Optional.empty();
}
}
}

@Override
public Optional<String> getAssigned(String serviceUnit) {
if (!validateChannelState(Started, true)) {
Expand Down Expand Up @@ -729,7 +740,7 @@ private AtomicLong getHandlerCounter(ServiceUnitStateData data, boolean total) {

private void log(Throwable e, String serviceUnit, ServiceUnitStateData data, ServiceUnitStateData next) {
if (e == null) {
if (log.isDebugEnabled() || isTransferCommand(data)) {
if (debug() || isTransferCommand(data)) {
long handlerTotalCount = getHandlerTotalCounter(data).get();
long handlerFailureCount = getHandlerFailureCounter(data).get();
log.info("{} handled {} event for serviceUnit:{}, cur:{}, next:{}, "
Expand Down Expand Up @@ -768,6 +779,9 @@ private void handleSkippedEvent(String serviceUnit) {
private void handleOwnEvent(String serviceUnit, ServiceUnitStateData data) {
var getOwnerRequest = getOwnerRequests.remove(serviceUnit);
if (getOwnerRequest != null) {
if (debug()) {
log.info("Returned owner request for serviceUnit:{}", serviceUnit);
}
getOwnerRequest.complete(data.dstBroker());
}

Expand Down Expand Up @@ -892,26 +906,51 @@ private boolean isTargetBroker(String broker) {
}

private CompletableFuture<String> deferGetOwnerRequest(String serviceUnit) {

var requested = new MutableObject<CompletableFuture<String>>();
try {
return getOwnerRequests
.computeIfAbsent(serviceUnit, k -> {
CompletableFuture<String> future = new CompletableFuture<>();
var ownerBefore = getOwner(serviceUnit);
if (ownerBefore.isPresent()) {
// Here, we do a quick active check first with the computeIfAbsent lock
brokerRegistry.lookupAsync(ownerBefore.get()).getNow(Optional.empty())
.ifPresent(__ -> requested.setValue(
CompletableFuture.completedFuture(ownerBefore.get())));
if (requested.getValue() != null) {
return requested.getValue();
}
}
CompletableFuture<String> future =
new CompletableFuture<String>().orTimeout(inFlightStateWaitingTimeInMillis,
TimeUnit.MILLISECONDS)
.exceptionally(e -> {
var ownerAfter = getOwner(serviceUnit);
log.warn(
"{} failed to wait for owner for serviceUnit:{}; Trying to "
+ "return the current owner:{}",
brokerId, serviceUnit, ownerAfter, e);
if (ownerAfter.isEmpty()) {
throw new CompletionException(e);
}
return ownerAfter.get();
});
if (debug()) {
log.info("{} is waiting for owner for serviceUnit:{}", brokerId, serviceUnit);
}

requested.setValue(future);
return future;
});
} finally {
var future = requested.getValue();
if (future != null) {
future.orTimeout(inFlightStateWaitingTimeInMillis + 5 * 1000, TimeUnit.MILLISECONDS)
.whenComplete((v, e) -> {
if (e != null) {
getOwnerRequests.remove(serviceUnit, future);
log.warn("Failed to getOwner for serviceUnit:{}",
serviceUnit, e);
}
}
);
future.whenComplete((__, e) -> {
getOwnerRequests.remove(serviceUnit);
if (e != null) {
log.warn("{} failed to getOwner for serviceUnit:{}", brokerId, serviceUnit, e);
}
});
}
}
}
Expand Down
Loading

0 comments on commit 2fbf000

Please sign in to comment.