Skip to content

Commit

Permalink
[fix][broker] Update init and shutdown time and other minor logic (Ex…
Browse files Browse the repository at this point in the history
…tensibleLoadManagerImpl only) (apache#22930)

(cherry picked from commit aa8f696)
  • Loading branch information
heesung-sn committed Jun 26, 2024
1 parent 2cf6e51 commit 9b6156a
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,8 @@ static void createTenantIfAbsent(PulsarResources resources, String tenant, Strin
}
}

static void createNamespaceIfAbsent(PulsarResources resources, NamespaceName namespaceName,
String cluster, int bundleNumber) throws IOException {
public static void createNamespaceIfAbsent(PulsarResources resources, NamespaceName namespaceName,
String cluster, int bundleNumber) throws IOException {
NamespaceResources namespaceResources = resources.getNamespaceResources();

if (!namespaceResources.namespaceExists(namespaceName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,9 @@ public CompletableFuture<Optional<BrokerLookupData>> getOwnershipWithLookupDataA

public CompletableFuture<Void> unloadNamespaceBundleAsync(ServiceUnitId bundle,
Optional<String> destinationBroker,
boolean force) {
boolean force,
long timeout,
TimeUnit timeoutUnit) {
if (NamespaceService.isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString())) {
log.info("Skip unloading namespace bundle: {}.", bundle);
return CompletableFuture.completedFuture(null);
Expand All @@ -647,7 +649,7 @@ public CompletableFuture<Void> unloadNamespaceBundleAsync(ServiceUnitId bundle,
UnloadDecision unloadDecision =
new UnloadDecision(unload, UnloadDecision.Label.Success, UnloadDecision.Reason.Admin);
return unloadAsync(unloadDecision,
conf.getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS);
timeout, timeoutUnit);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {

private static final int OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS = 5000;
private static final int OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS = 100;
private static final int OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS = 3000;
public static final long VERSION_ID_INIT = 1; // initial versionId
private static final long OWNERSHIP_MONITOR_DELAY_TIME_IN_SECS = 60;
public static final long MAX_CLEAN_UP_DELAY_TIME_IN_SECS = 3 * 60; // 3 mins
Expand Down Expand Up @@ -305,7 +304,8 @@ public synchronized void start() throws PulsarServerException {
}
}
PulsarClusterMetadataSetup.createNamespaceIfAbsent
(pulsar.getPulsarResources(), SYSTEM_NAMESPACE, config.getClusterName());
(pulsar.getPulsarResources(), SYSTEM_NAMESPACE, config.getClusterName(),
config.getDefaultNumberOfNamespaceBundles());

ExtensibleLoadManagerImpl.createSystemTopic(pulsar, TOPIC);

Expand Down Expand Up @@ -1316,11 +1316,6 @@ private void waitForCleanups(String broker, boolean excludeSystemTopics, int max
}
}
if (cleaned) {
try {
MILLISECONDS.sleep(OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS);
} catch (InterruptedException e) {
log.warn("Interrupted while gracefully waiting for the cleanup convergence.");
}
break;
} else {
try {
Expand All @@ -1331,9 +1326,23 @@ private void waitForCleanups(String broker, boolean excludeSystemTopics, int max
}
}
}
log.info("Finished cleanup waiting for orphan broker:{}. Elapsed {} ms", brokerId,
System.currentTimeMillis() - started);
}

private synchronized void doCleanup(String broker) {
try {
if (getChannelOwnerAsync().get(MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS, TimeUnit.SECONDS)
.isEmpty()) {
log.error("Found the channel owner is empty. Skip the inactive broker:{}'s orphan bundle cleanup",
broker);
return;
}
} catch (Exception e) {
log.error("Failed to find the channel owner. Skip the inactive broker:{}'s orphan bundle cleanup", broker);
return;
}

long startTime = System.nanoTime();
log.info("Started ownership cleanup for the inactive broker:{}", broker);
int orphanServiceUnitCleanupCnt = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;

Expand All @@ -44,6 +43,7 @@
public class TableViewLoadDataStoreImpl<T> implements LoadDataStore<T> {

private static final long LOAD_DATA_REPORT_UPDATE_MAX_INTERVAL_MULTIPLIER_BEFORE_RESTART = 2;
private static final long INIT_TIMEOUT_IN_SECS = 5;

private volatile TableView<T> tableView;
private volatile long tableViewLastUpdateTimestamp;
Expand Down Expand Up @@ -123,10 +123,11 @@ public synchronized void start() throws LoadDataStoreException {
public synchronized void startTableView() throws LoadDataStoreException {
if (tableView == null) {
try {
tableView = client.newTableViewBuilder(Schema.JSON(clazz)).topic(topic).create();
tableView = client.newTableViewBuilder(Schema.JSON(clazz)).topic(topic).createAsync()
.get(INIT_TIMEOUT_IN_SECS, TimeUnit.SECONDS);
tableView.forEachAndListen((k, v) ->
tableViewLastUpdateTimestamp = System.currentTimeMillis());
} catch (PulsarClientException e) {
} catch (Exception e) {
tableView = null;
throw new LoadDataStoreException(e);
}
Expand All @@ -137,8 +138,9 @@ public synchronized void startTableView() throws LoadDataStoreException {
public synchronized void startProducer() throws LoadDataStoreException {
if (producer == null) {
try {
producer = client.newProducer(Schema.JSON(clazz)).topic(topic).create();
} catch (PulsarClientException e) {
producer = client.newProducer(Schema.JSON(clazz)).topic(topic).createAsync()
.get(INIT_TIMEOUT_IN_SECS, TimeUnit.SECONDS);
} catch (Exception e) {
producer = null;
throw new LoadDataStoreException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,7 @@ public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle,
boolean closeWithoutWaitingClientDisconnect) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return ExtensibleLoadManagerImpl.get(loadManager.get())
.unloadNamespaceBundleAsync(bundle, destinationBroker, false);
.unloadNamespaceBundleAsync(bundle, destinationBroker, false, timeout, timeoutUnit);
}
// unload namespace bundle
OwnedBundle ob = ownershipCache.getOwnedBundle(bundle);
Expand Down Expand Up @@ -1233,7 +1233,8 @@ public CompletableFuture<Void> removeOwnedServiceUnitAsync(NamespaceBundle nsBun
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get());
future = extensibleLoadManager.unloadNamespaceBundleAsync(
nsBundle, Optional.empty(), true);
nsBundle, Optional.empty(), true,
pulsar.getConfig().getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS);
} else {
future = ownershipCache.removeOwnership(nsBundle);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.protocol;

import java.io.File;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.util.PortManager;
import org.apache.commons.io.FileUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Slf4j
public class PulsarClientBasedHandlerTest {

private final static String clusterName = "cluster";
private final static int shutdownTimeoutMs = 100;
private final int zkPort = PortManager.nextFreePort();
private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, zkPort, PortManager::nextFreePort);
private File tempDirectory;
private PulsarService pulsar;

@BeforeClass
public void setup() throws Exception {
bk.start();
final var config = new ServiceConfiguration();
config.setClusterName(clusterName);
config.setAdvertisedAddress("localhost");
config.setBrokerServicePort(Optional.of(0));
config.setWebServicePort(Optional.of(0));
config.setMetadataStoreUrl("zk:127.0.0.1:" + zkPort);

tempDirectory = SimpleProtocolHandlerTestsBase.configureProtocolHandler(config,
PulsarClientBasedHandler.class.getName(), true);

config.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
config.setLoadBalancerDebugModeEnabled(true);
config.setBrokerShutdownTimeoutMs(shutdownTimeoutMs);

pulsar = new PulsarService(config);
pulsar.start();
}

@Test(timeOut = 30000)
public void testStopBroker() throws PulsarServerException {
final var beforeStop = System.currentTimeMillis();
final var handler = (PulsarClientBasedHandler) pulsar.getProtocolHandlers()
.protocol(PulsarClientBasedHandler.PROTOCOL);
pulsar.close();
final var elapsedMs = System.currentTimeMillis() - beforeStop;
log.info("It spends {} ms to stop the broker ({} for protocol handler)", elapsedMs, handler.closeTimeMs);
Assert.assertTrue(elapsedMs <
+ handler.closeTimeMs + shutdownTimeoutMs + 1000); // tolerate 1 more second for other processes
}

@AfterClass(alwaysRun = true)
public void cleanup() throws Exception {
bk.stop();
if (tempDirectory != null) {
FileUtils.deleteDirectory(tempDirectory);
}
}
}

0 comments on commit 9b6156a

Please sign in to comment.