Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Don't merge][cherry-pick][branch-2.11] Cherry-pick some PR to branch-2.11 #21427

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -528,9 +528,9 @@ The Apache Software License, Version 2.0
- io.vertx-vertx-web-3.9.8.jar
- io.vertx-vertx-web-common-3.9.8.jar
* Apache ZooKeeper
- org.apache.zookeeper-zookeeper-3.8.1.jar
- org.apache.zookeeper-zookeeper-jute-3.8.1.jar
- org.apache.zookeeper-zookeeper-prometheus-metrics-3.8.1.jar
- org.apache.zookeeper-zookeeper-3.8.3.jar
- org.apache.zookeeper-zookeeper-jute-3.8.3.jar
- org.apache.zookeeper-zookeeper-prometheus-metrics-3.8.3.jar
* Snappy Java
- org.xerial.snappy-snappy-java-1.1.10.1.jar
* Google HTTP Client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1221,6 +1221,10 @@ public CompletableFuture<Long> getEarliestMessagePublishTimeOfPos(PositionImpl p
}
PositionImpl nextPos = getNextValidPosition(pos);

if (nextPos.compareTo(lastConfirmedEntry) > 0) {
return CompletableFuture.completedFuture(-1L);
}

asyncReadEntry(nextPos, new ReadEntryCallback() {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.mledger.impl;

import static org.apache.pulsar.common.util.PortManager.releaseLockedPort;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
Expand Down Expand Up @@ -117,7 +118,7 @@ public void testBookieFailure() throws Exception {
metadataStore.unsetAlwaysFail();

bkc = new BookKeeperTestClient(baseClientConf);
startNewBookie();
int port = startNewBookie();

// Reconnect a new bk client
factory.shutdown();
Expand Down Expand Up @@ -147,6 +148,7 @@ public void testBookieFailure() throws Exception {
assertEquals("entry-2", new String(entries.get(0).getData()));
entries.forEach(Entry::release);
factory.shutdown();
releaseLockedPort(port);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
package org.apache.bookkeeper.test;

import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
import static org.testng.Assert.assertFalse;

import com.google.common.base.Stopwatch;
Expand Down Expand Up @@ -62,7 +63,7 @@
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.replication.Auditor;
import org.apache.bookkeeper.replication.ReplicationWorker;
import org.apache.bookkeeper.util.PortManager;
import org.apache.pulsar.common.util.PortManager;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
Expand Down Expand Up @@ -113,6 +114,7 @@ public void handleTestMethodName(Method method) {

private boolean isAutoRecoveryEnabled;
protected ExecutorService executor;
private final List<Integer> bookiePorts = new ArrayList<>();

SynchronousQueue<Throwable> asyncExceptions = new SynchronousQueue<>();
protected void captureThrowable(Runnable c) {
Expand Down Expand Up @@ -264,7 +266,7 @@ protected void startBKCluster(String metadataServiceUri) throws Exception {

// Create Bookie Servers (B1, B2, B3)
for (int i = 0; i < numBookies; i++) {
startNewBookie();
bookiePorts.add(startNewBookie());
}
}

Expand All @@ -283,14 +285,15 @@ protected void stopBKCluster() throws Exception {
t.shutdown();
}
servers.clear();
bookiePorts.removeIf(PortManager::releaseLockedPort);
}

protected ServerConfiguration newServerConfiguration() throws Exception {
File f = tmpDirs.createNew("bookie", "test");

int port;
if (baseConf.isEnableLocalTransport() || !baseConf.getAllowEphemeralPorts()) {
port = PortManager.nextFreePort();
port = nextLockedFreePort();
} else {
port = 0;
}
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ flexible messaging model and an intuitive client API.</description>
<commons-compress.version>1.21</commons-compress.version>

<bookkeeper.version>4.15.4</bookkeeper.version>
<zookeeper.version>3.8.1</zookeeper.version>
<zookeeper.version>3.8.3</zookeeper.version>
<commons-cli.version>1.5.0</commons-cli.version>
<commons-text.version>1.10.0</commons-text.version>
<snappy.version>1.1.10.1</snappy.version> <!-- ZooKeeper server -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,6 @@ public synchronized void setConf(Configuration conf) {
store.registerListener(this::handleUpdates);
racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get()
.orElseGet(BookiesRackConfiguration::new);
updateRacksWithHost(racksWithHost);
watchAvailableBookies();
for (Map<String, BookieInfo> bookieMapping : racksWithHost.values()) {
for (String address : bookieMapping.keySet()) {
bookieAddressListLastTime.add(BookieId.parse(address));
Expand All @@ -131,6 +129,8 @@ public synchronized void setConf(Configuration conf) {
bookieAddressListLastTime);
}
}
updateRacksWithHost(racksWithHost);
watchAvailableBookies();
} catch (InterruptedException | ExecutionException | MetadataException e) {
throw new RuntimeException(METADATA_STORE_INSTANCE + " failed to init BookieId list");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ public void testWithPulsarRegistrationClient() throws Exception {
bkClientConf.getTimeoutTimerNumTicks());

RackawareEnsemblePlacementPolicy repp = new RackawareEnsemblePlacementPolicy();
mapping.registerRackChangeListener(repp);
Class<?> clazz1 = Class.forName("org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy");
Field field1 = clazz1.getDeclaredField("knownBookies");
field1.setAccessible(true);
Expand Down Expand Up @@ -323,6 +324,22 @@ public void testWithPulsarRegistrationClient() throws Exception {
assertEquals(knownBookies.get(BOOKIE2.toBookieId()).getNetworkLocation(), "/rack1");
assertEquals(knownBookies.get(BOOKIE3.toBookieId()).getNetworkLocation(), "/default-rack");

//remove bookie2 rack, the bookie2 rack should be /default-rack
data = "{\"group1\": {\"" + BOOKIE1
+ "\": {\"rack\": \"/rack0\", \"hostname\": \"bookie1.example.com\"}}}";
store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, data.getBytes(), Optional.empty()).join();
Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> ((BookiesRackConfiguration)field.get(mapping)).get("group1").size() == 1);

racks = mapping
.resolve(Lists.newArrayList(BOOKIE1.getHostName(), BOOKIE2.getHostName(), BOOKIE3.getHostName()))
.stream().filter(Objects::nonNull).toList();
assertEquals(racks.size(), 1);
assertEquals(racks.get(0), "/rack0");
assertEquals(knownBookies.size(), 3);
assertEquals(knownBookies.get(BOOKIE1.toBookieId()).getNetworkLocation(), "/rack0");
assertEquals(knownBookies.get(BOOKIE2.toBookieId()).getNetworkLocation(), "/default-rack");
assertEquals(knownBookies.get(BOOKIE3.toBookieId()).getNetworkLocation(), "/default-rack");

timer.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,14 +221,17 @@ static void setDefaultEnsemblePlacementPolicy(
}
}

private void setEnsemblePlacementPolicy(ClientConfiguration bkConf, ServiceConfiguration conf, MetadataStore store,
static void setEnsemblePlacementPolicy(ClientConfiguration bkConf, ServiceConfiguration conf, MetadataStore store,
Class<? extends EnsemblePlacementPolicy> policyClass) {
bkConf.setEnsemblePlacementPolicy(policyClass);
bkConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE, store);
if (conf.isBookkeeperClientRackawarePolicyEnabled() || conf.isBookkeeperClientRegionawarePolicyEnabled()) {
bkConf.setProperty(REPP_DNS_RESOLVER_CLASS, conf.getProperties().getProperty(REPP_DNS_RESOLVER_CLASS,
BookieRackAffinityMapping.class.getName()));

bkConf.setMinNumRacksPerWriteQuorum(conf.getBookkeeperClientMinNumRacksPerWriteQuorum());
bkConf.setEnforceMinNumRacksPerWriteQuorum(conf.isBookkeeperClientEnforceMinNumRacksPerWriteQuorum());

bkConf.setProperty(NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY,
conf.getProperties().getProperty(
NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
Expand Down Expand Up @@ -176,8 +176,8 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
// Pulsar service used to initialize this.
private PulsarService pulsar;

// Executor service used to regularly update broker data.
private final ScheduledExecutorService scheduler;
// Executor service used to update broker data.
private final ExecutorService executors;

// check if given broker can load persistent/non-persistent topic
private final BrokerTopicLoadingPredicate brokerTopicLoadingPredicate;
Expand Down Expand Up @@ -216,7 +216,7 @@ public ModularLoadManagerImpl() {
loadData = new LoadData();
loadSheddingPipeline = new ArrayList<>();
preallocatedBundleToBroker = new ConcurrentHashMap<>();
scheduler = Executors.newSingleThreadScheduledExecutor(
executors = Executors.newSingleThreadExecutor(
new ExecutorProvider.ExtendedThreadFactory("pulsar-modular-load-manager"));
this.brokerToFailureDomainMap = new HashMap<>();

Expand Down Expand Up @@ -277,7 +277,7 @@ public void initialize(final PulsarService pulsar) {
// register listeners for domain changes
pulsar.getPulsarResources().getClusterResources().getFailureDomainResources()
.registerListener(__ -> {
scheduler.execute(() -> refreshBrokerToFailureDomainMap());
executors.execute(() -> refreshBrokerToFailureDomainMap());
});

loadSheddingPipeline.add(createLoadSheddingStrategy());
Expand All @@ -291,7 +291,7 @@ public void handleDataNotification(Notification t) {
});

try {
scheduler.execute(ModularLoadManagerImpl.this::updateAll);
executors.execute(ModularLoadManagerImpl.this::updateAll);
} catch (RejectedExecutionException e) {
// Executor is shutting down
}
Expand Down Expand Up @@ -1019,7 +1019,7 @@ public void start() throws PulsarServerException {
*/
@Override
public void stop() throws PulsarServerException {
scheduler.shutdownNow();
executors.shutdownNow();

try {
brokersData.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,10 @@ public void handleExceededBacklogQuota(PersistentTopic persistentTopic, BacklogQ
break;
case producer_exception:
case producer_request_hold:
disconnectProducers(persistentTopic);
if (!advanceSlowestSystemCursor(persistentTopic)) {
// The slowest is not a system cursor. Disconnecting producers to put backpressure.
disconnectProducers(persistentTopic);
}
break;
default:
break;
Expand Down Expand Up @@ -268,4 +271,27 @@ private void disconnectProducers(PersistentTopic persistentTopic) {

});
}

/**
* Advances the slowest cursor if that is a system cursor.
*
* @param persistentTopic
* @return true if the slowest cursor is a system cursor
*/
private boolean advanceSlowestSystemCursor(PersistentTopic persistentTopic) {

ManagedLedgerImpl mLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
ManagedCursor slowestConsumer = mLedger.getSlowestConsumer();
if (slowestConsumer == null) {
return false;
}

if (PersistentTopic.isDedupCursorName(slowestConsumer.getName())) {
persistentTopic.getMessageDeduplication().takeSnapshot();
return true;
}

// We may need to check other system cursors here : replicator, compaction
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,20 +92,23 @@ public SystemTopicBasedTopicPoliciesService(PulsarService pulsarService) {

@Override
public CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName topicName) {
if (NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) {
return CompletableFuture.completedFuture(null);
}
return sendTopicPolicyEvent(topicName, ActionType.DELETE, null);
}

@Override
public CompletableFuture<Void> updateTopicPoliciesAsync(TopicName topicName, TopicPolicies policies) {
if (NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) {
return CompletableFuture.failedFuture(new BrokerServiceException.NotAllowedException(
"Not allowed to update topic policy for the heartbeat topic"));
}
return sendTopicPolicyEvent(topicName, ActionType.UPDATE, policies);
}

private CompletableFuture<Void> sendTopicPolicyEvent(TopicName topicName, ActionType actionType,
TopicPolicies policies) {
if (NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) {
return CompletableFuture.failedFuture(
new BrokerServiceException.NotAllowedException("Not allowed to send event to health check topic"));
}
return pulsarService.getPulsarResources().getNamespaceResources()
.getPoliciesAsync(topicName.getNamespaceObject())
.thenCompose(namespacePolicies -> {
Expand Down Expand Up @@ -217,6 +220,9 @@ public TopicPolicies getTopicPolicies(TopicName topicName) throws TopicPoliciesC
@Override
public TopicPolicies getTopicPolicies(TopicName topicName,
boolean isGlobal) throws TopicPoliciesCacheNotInitException {
if (NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) {
return null;
}
if (!policyCacheInitMap.containsKey(topicName.getNamespaceObject())) {
NamespaceName namespace = topicName.getNamespaceObject();
prepareInitPoliciesCache(namespace, new CompletableFuture<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
Expand Down Expand Up @@ -132,6 +133,9 @@ public MessageDupUnknownException() {

private final String replicatorPrefix;


private final AtomicBoolean snapshotTaking = new AtomicBoolean(false);

public MessageDeduplication(PulsarService pulsar, PersistentTopic topic, ManagedLedger managedLedger) {
this.pulsar = pulsar;
this.topic = topic;
Expand Down Expand Up @@ -432,6 +436,11 @@ private void takeSnapshot(Position position) {
if (log.isDebugEnabled()) {
log.debug("[{}] Taking snapshot of sequence ids map", topic.getName());
}

if (!snapshotTaking.compareAndSet(false, true)) {
return;
}

Map<String, Long> snapshot = new TreeMap<>();
highestSequencedPersisted.forEach((producerName, sequenceId) -> {
if (snapshot.size() < maxNumberOfProducers) {
Expand All @@ -446,11 +455,13 @@ public void markDeleteComplete(Object ctx) {
log.debug("[{}] Stored new deduplication snapshot at {}", topic.getName(), position);
}
lastSnapshotTimestamp = System.currentTimeMillis();
snapshotTaking.set(false);
}

@Override
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
log.warn("[{}] Failed to store new deduplication snapshot at {}", topic.getName(), position);
snapshotTaking.set(false);
}
}, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1154,16 +1154,20 @@ public SubscriptionStatsImpl getStats(Boolean getPreciseBacklog, boolean subscri
subStats.backlogSize = ((ManagedLedgerImpl) topic.getManagedLedger())
.getEstimatedBacklogSize((PositionImpl) cursor.getMarkDeletedPosition());
}
if (getEarliestTimeInBacklog && subStats.msgBacklog > 0) {
ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl) cursor.getManagedLedger());
PositionImpl markDeletedPosition = (PositionImpl) cursor.getMarkDeletedPosition();
long result = 0;
try {
result = managedLedger.getEarliestMessagePublishTimeOfPos(markDeletedPosition).get();
} catch (InterruptedException | ExecutionException e) {
result = -1;
if (getEarliestTimeInBacklog) {
if (subStats.msgBacklog > 0) {
ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl) cursor.getManagedLedger());
PositionImpl markDeletedPosition = (PositionImpl) cursor.getMarkDeletedPosition();
long result = 0;
try {
result = managedLedger.getEarliestMessagePublishTimeOfPos(markDeletedPosition).get();
} catch (InterruptedException | ExecutionException e) {
result = -1;
}
subStats.earliestMsgPublishTimeInBacklog = result;
} else {
subStats.earliestMsgPublishTimeInBacklog = -1;
}
subStats.earliestMsgPublishTimeInBacklog = result;
}
subStats.msgBacklogNoDelayed = subStats.msgBacklog - subStats.msgDelayed;
subStats.msgRateExpired = expiryMonitor.getMessageExpiryRate();
Expand Down
Loading
Loading