Skip to content

Commit

Permalink
[fix] [broker] Fix isolated group not work problem. (#21096)
Browse files Browse the repository at this point in the history
When upgraded the pulsar version from 2.9.2 to 2.10.3, and the isolated group feature not work anymore.
Finally, we found the problem. In IsolatedBookieEnsemblePlacementPolicy, when it gets the bookie rack from the metadata store cache, uses future.isDone() to avoid sync operation. If the future is incomplete, return empty blacklists.
The cache may expire due to the caffeine cache `getExpireAfterWriteMillis` config, if the cache expires, the future may be incomplete. (#21095 will correct the behavior)

In 2.9.2, it uses the sync to get data from the metadata store, we should also keep the behavior.
  • Loading branch information
horizonzy authored and Technoboy- committed Sep 11, 2023
1 parent b104369 commit 635b5e5
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.bookie.rackawareness;

import static org.apache.pulsar.bookie.rackawareness.BookieRackAffinityMapping.METADATA_STORE_INSTANCE;
import com.google.common.annotations.VisibleForTesting;
import io.netty.util.HashedWheelTimer;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -27,7 +28,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
Expand Down Expand Up @@ -61,6 +61,7 @@ public class IsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePlac

private static final String PULSAR_SYSTEM_TOPIC_ISOLATION_GROUP = "*";

private volatile BookiesRackConfiguration cachedRackConfiguration = null;

public IsolatedBookieEnsemblePlacementPolicy() {
super();
Expand All @@ -86,7 +87,12 @@ public RackawareEnsemblePlacementPolicyImpl initialize(ClientConfiguration conf,
}
// Only add the bookieMappingCache if we have defined an isolation group
bookieMappingCache = store.getMetadataCache(BookiesRackConfiguration.class);
bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH).join();
bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH).thenAccept(opt -> opt.ifPresent(
bookiesRackConfiguration -> cachedRackConfiguration = bookiesRackConfiguration))
.exceptionally(e -> {
log.warn("Failed to load bookies rack configuration while initialize the PlacementPolicy.");
return null;
});
}
if (conf.getProperty(SECONDARY_ISOLATION_BOOKIE_GROUPS) != null) {
String secondaryIsolationGroupsString = ConfigurationStringUtil
Expand Down Expand Up @@ -179,25 +185,26 @@ private static Pair<Set<String>, Set<String>> getIsolationGroup(
return pair;
}

private Set<BookieId> getExcludedBookiesWithIsolationGroups(int ensembleSize,
@VisibleForTesting
Set<BookieId> getExcludedBookiesWithIsolationGroups(int ensembleSize,
Pair<Set<String>, Set<String>> isolationGroups) {
Set<BookieId> excludedBookies = new HashSet<>();
if (isolationGroups != null && isolationGroups.getLeft().contains(PULSAR_SYSTEM_TOPIC_ISOLATION_GROUP)) {
if (isolationGroups != null && isolationGroups.getLeft().contains(PULSAR_SYSTEM_TOPIC_ISOLATION_GROUP)) {
return excludedBookies;
}
try {
if (bookieMappingCache != null) {
CompletableFuture<Optional<BookiesRackConfiguration>> future =
bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH);
bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH)
.thenAccept(opt -> cachedRackConfiguration = opt.orElse(null)).exceptionally(e -> {
log.warn("Failed to update the newest bookies rack config.");
return null;
});

Optional<BookiesRackConfiguration> optRes = (future.isDone() && !future.isCompletedExceptionally())
? future.join() : Optional.empty();

if (optRes.isEmpty()) {
BookiesRackConfiguration allGroupsBookieMapping = cachedRackConfiguration;
if (allGroupsBookieMapping == null) {
log.debug("The bookies rack config is not available at now.");
return excludedBookies;
}

BookiesRackConfiguration allGroupsBookieMapping = optRes.get();
Set<String> allBookies = allGroupsBookieMapping.keySet();
int totalAvailableBookiesInPrimaryGroup = 0;
Set<String> primaryIsolationGroup = Collections.emptySet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
*/
package org.apache.pulsar.bookie.rackawareness;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import io.netty.util.HashedWheelTimer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
Expand All @@ -34,18 +37,23 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.feature.SettableFeatureProvider;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.common.policies.data.BookieInfo;
import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -114,6 +122,113 @@ public void testNonRegionBookie() throws Exception {
assertFalse(ensemble.contains(new BookieSocketAddress(BOOKIE4).toBookieId()));
}

@Test
public void testMetadataStoreCases() throws Exception {
Map<String, BookieInfo> mainBookieGroup = new HashMap<>();
mainBookieGroup.put(BOOKIE1, BookieInfo.builder().rack("rack0").build());
mainBookieGroup.put(BOOKIE2, BookieInfo.builder().rack("rack1").build());
mainBookieGroup.put(BOOKIE3, BookieInfo.builder().rack("rack1").build());
mainBookieGroup.put(BOOKIE4, BookieInfo.builder().rack("rack0").build());

Map<String, BookieInfo> secondaryBookieGroup = new HashMap<>();

store = mock(MetadataStoreExtended.class);
MetadataCacheImpl cache = mock(MetadataCacheImpl.class);
when(store.getMetadataCache(BookiesRackConfiguration.class)).thenReturn(cache);
CompletableFuture<Optional<BookiesRackConfiguration>> initialFuture = new CompletableFuture<>();
//The initialFuture only has group1.
BookiesRackConfiguration rackConfiguration1 = new BookiesRackConfiguration();
rackConfiguration1.put("group1", mainBookieGroup);
rackConfiguration1.put("group2", secondaryBookieGroup);
initialFuture.complete(Optional.of(rackConfiguration1));

long waitTime = 2000;
CompletableFuture<Optional<BookiesRackConfiguration>> waitingCompleteFuture = new CompletableFuture<>();
new Thread(() -> {
try {
Thread.sleep(waitTime);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
//The waitingCompleteFuture has group1 and group2.
BookiesRackConfiguration rackConfiguration2 = new BookiesRackConfiguration();
Map<String, BookieInfo> mainBookieGroup2 = new HashMap<>();
mainBookieGroup2.put(BOOKIE1, BookieInfo.builder().rack("rack0").build());
mainBookieGroup2.put(BOOKIE2, BookieInfo.builder().rack("rack1").build());
mainBookieGroup2.put(BOOKIE4, BookieInfo.builder().rack("rack0").build());

Map<String, BookieInfo> secondaryBookieGroup2 = new HashMap<>();
secondaryBookieGroup2.put(BOOKIE3, BookieInfo.builder().rack("rack0").build());
rackConfiguration2.put("group1", mainBookieGroup2);
rackConfiguration2.put("group2", secondaryBookieGroup2);
waitingCompleteFuture.complete(Optional.of(rackConfiguration2));
}).start();

long longWaitTime = 4000;
CompletableFuture<Optional<BookiesRackConfiguration>> emptyFuture = new CompletableFuture<>();
new Thread(() -> {
try {
Thread.sleep(longWaitTime);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
//The emptyFuture means that the zk node /bookies already be removed.
emptyFuture.complete(Optional.empty());
}).start();

//Return different future means that cache expire.
when(cache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH))
.thenReturn(initialFuture).thenReturn(initialFuture)
.thenReturn(waitingCompleteFuture).thenReturn(waitingCompleteFuture)
.thenReturn(emptyFuture).thenReturn(emptyFuture);

IsolatedBookieEnsemblePlacementPolicy isolationPolicy = new IsolatedBookieEnsemblePlacementPolicy();
ClientConfiguration bkClientConf = new ClientConfiguration();
bkClientConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE, store);
bkClientConf.setProperty(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, isolationGroups);
isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, SettableFeatureProvider.DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);

MutablePair<Set<String>, Set<String>> groups = new MutablePair<>();
groups.setLeft(Sets.newHashSet("group1"));
groups.setRight(new HashSet<>());

//initialFuture, the future is waiting done.
Set<BookieId> blacklist =
isolationPolicy.getExcludedBookiesWithIsolationGroups(2, groups);
assertTrue(blacklist.isEmpty());

//waitingCompleteFuture, the future is waiting done.
blacklist =
isolationPolicy.getExcludedBookiesWithIsolationGroups(2, groups);
assertTrue(blacklist.isEmpty());

Thread.sleep(waitTime);

//waitingCompleteFuture, the future is already done.
blacklist =
isolationPolicy.getExcludedBookiesWithIsolationGroups(2, groups);
assertFalse(blacklist.isEmpty());
assertEquals(blacklist.size(), 1);
BookieId excludeBookie = blacklist.iterator().next();
assertEquals(excludeBookie.toString(), BOOKIE3);

//emptyFuture, the future is waiting done.
blacklist =
isolationPolicy.getExcludedBookiesWithIsolationGroups(2, groups);
assertFalse(blacklist.isEmpty());
assertEquals(blacklist.size(), 1);
excludeBookie = blacklist.iterator().next();
assertEquals(excludeBookie.toString(), BOOKIE3);

Thread.sleep(longWaitTime - waitTime);

//emptyFuture, the future is already done.
blacklist =
isolationPolicy.getExcludedBookiesWithIsolationGroups(2, groups);
assertTrue(blacklist.isEmpty());
}

@Test
public void testBasic() throws Exception {
Map<String, Map<String, BookieInfo>> bookieMapping = new HashMap<>();
Expand Down

0 comments on commit 635b5e5

Please sign in to comment.