Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] [broker] Fix isolated group not work problem. #21096

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.bookie.rackawareness;

import static org.apache.pulsar.bookie.rackawareness.BookieRackAffinityMapping.METADATA_STORE_INSTANCE;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import io.netty.util.HashedWheelTimer;
import java.util.Collections;
Expand All @@ -27,7 +28,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
Expand Down Expand Up @@ -61,6 +61,7 @@ public class IsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePlac

private static final String PULSAR_SYSTEM_TOPIC_ISOLATION_GROUP = "*";

private volatile BookiesRackConfiguration cachedRackConfiguration = null;

public IsolatedBookieEnsemblePlacementPolicy() {
super();
Expand All @@ -86,7 +87,12 @@ public RackawareEnsemblePlacementPolicyImpl initialize(ClientConfiguration conf,
}
// Only add the bookieMappingCache if we have defined an isolation group
bookieMappingCache = store.getMetadataCache(BookiesRackConfiguration.class);
bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH).join();
bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH).thenAccept(opt -> opt.ifPresent(
bookiesRackConfiguration -> cachedRackConfiguration = bookiesRackConfiguration))
.exceptionally(e -> {
log.warn("Failed to load bookies rack configuration while initialize the PlacementPolicy.");
return null;
});
}
if (conf.getProperty(SECONDARY_ISOLATION_BOOKIE_GROUPS) != null) {
String secondaryIsolationGroupsString = ConfigurationStringUtil
Expand Down Expand Up @@ -179,25 +185,26 @@ private static Pair<Set<String>, Set<String>> getIsolationGroup(
return pair;
}

private Set<BookieId> getExcludedBookiesWithIsolationGroups(int ensembleSize,
@VisibleForTesting
Set<BookieId> getExcludedBookiesWithIsolationGroups(int ensembleSize,
Pair<Set<String>, Set<String>> isolationGroups) {
Set<BookieId> excludedBookies = new HashSet<>();
if (isolationGroups != null && isolationGroups.getLeft().contains(PULSAR_SYSTEM_TOPIC_ISOLATION_GROUP)) {
if (isolationGroups != null && isolationGroups.getLeft().contains(PULSAR_SYSTEM_TOPIC_ISOLATION_GROUP)) {
return excludedBookies;
}
try {
if (bookieMappingCache != null) {
CompletableFuture<Optional<BookiesRackConfiguration>> future =
bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH);
bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH)
.thenAccept(opt -> cachedRackConfiguration = opt.orElse(null)).exceptionally(e -> {
log.warn("Failed to update the newest bookies rack config.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change the log msg to log.warn("Failed to update the newest bookies rack config, and use the cached rack configuration: {}", cachedRackConfiguration)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an async operation, it won't affect the result of the current invocation, and printing this log may be misleading.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense

return null;
});

Optional<BookiesRackConfiguration> optRes = (future.isDone() && !future.isCompletedExceptionally())
? future.join() : Optional.empty();

if (optRes.isEmpty()) {
BookiesRackConfiguration allGroupsBookieMapping = cachedRackConfiguration;
if (allGroupsBookieMapping == null) {
log.debug("The bookies rack config is not available at now.");
return excludedBookies;
}

BookiesRackConfiguration allGroupsBookieMapping = optRes.get();
Set<String> allBookies = allGroupsBookieMapping.keySet();
int totalAvailableBookiesInPrimaryGroup = 0;
Set<String> primaryIsolationGroup = Collections.emptySet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
*/
package org.apache.pulsar.bookie.rackawareness;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import io.netty.util.HashedWheelTimer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
Expand All @@ -34,18 +37,23 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.feature.SettableFeatureProvider;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.common.policies.data.BookieInfo;
import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -114,6 +122,113 @@ public void testNonRegionBookie() throws Exception {
assertFalse(ensemble.contains(new BookieSocketAddress(BOOKIE4).toBookieId()));
}

@Test
public void testMetadataStoreCases() throws Exception {
Map<String, BookieInfo> mainBookieGroup = new HashMap<>();
mainBookieGroup.put(BOOKIE1, BookieInfo.builder().rack("rack0").build());
mainBookieGroup.put(BOOKIE2, BookieInfo.builder().rack("rack1").build());
mainBookieGroup.put(BOOKIE3, BookieInfo.builder().rack("rack1").build());
mainBookieGroup.put(BOOKIE4, BookieInfo.builder().rack("rack0").build());

Map<String, BookieInfo> secondaryBookieGroup = new HashMap<>();

store = mock(MetadataStoreExtended.class);
MetadataCacheImpl cache = mock(MetadataCacheImpl.class);
when(store.getMetadataCache(BookiesRackConfiguration.class)).thenReturn(cache);
CompletableFuture<Optional<BookiesRackConfiguration>> initialFuture = new CompletableFuture<>();
//The initialFuture only has group1.
BookiesRackConfiguration rackConfiguration1 = new BookiesRackConfiguration();
rackConfiguration1.put("group1", mainBookieGroup);
rackConfiguration1.put("group2", secondaryBookieGroup);
initialFuture.complete(Optional.of(rackConfiguration1));

long waitTime = 2000;
CompletableFuture<Optional<BookiesRackConfiguration>> waitingCompleteFuture = new CompletableFuture<>();
new Thread(() -> {
try {
Thread.sleep(waitTime);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
//The waitingCompleteFuture has group1 and group2.
BookiesRackConfiguration rackConfiguration2 = new BookiesRackConfiguration();
Map<String, BookieInfo> mainBookieGroup2 = new HashMap<>();
mainBookieGroup2.put(BOOKIE1, BookieInfo.builder().rack("rack0").build());
mainBookieGroup2.put(BOOKIE2, BookieInfo.builder().rack("rack1").build());
mainBookieGroup2.put(BOOKIE4, BookieInfo.builder().rack("rack0").build());

Map<String, BookieInfo> secondaryBookieGroup2 = new HashMap<>();
secondaryBookieGroup2.put(BOOKIE3, BookieInfo.builder().rack("rack0").build());
rackConfiguration2.put("group1", mainBookieGroup2);
rackConfiguration2.put("group2", secondaryBookieGroup2);
waitingCompleteFuture.complete(Optional.of(rackConfiguration2));
}).start();

long longWaitTime = 4000;
CompletableFuture<Optional<BookiesRackConfiguration>> emptyFuture = new CompletableFuture<>();
new Thread(() -> {
try {
Thread.sleep(longWaitTime);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
//The emptyFuture means that the zk node /bookies already be removed.
emptyFuture.complete(Optional.empty());
}).start();

//Return different future means that cache expire.
when(cache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH))
.thenReturn(initialFuture).thenReturn(initialFuture)
.thenReturn(waitingCompleteFuture).thenReturn(waitingCompleteFuture)
.thenReturn(emptyFuture).thenReturn(emptyFuture);

IsolatedBookieEnsemblePlacementPolicy isolationPolicy = new IsolatedBookieEnsemblePlacementPolicy();
ClientConfiguration bkClientConf = new ClientConfiguration();
bkClientConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE, store);
bkClientConf.setProperty(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, isolationGroups);
isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, SettableFeatureProvider.DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);

MutablePair<Set<String>, Set<String>> groups = new MutablePair<>();
groups.setLeft(Sets.newHashSet("group1"));
groups.setRight(new HashSet<>());

//initialFuture, the future is waiting done.
Set<BookieId> blacklist =
isolationPolicy.getExcludedBookiesWithIsolationGroups(2, groups);
assertTrue(blacklist.isEmpty());

//waitingCompleteFuture, the future is waiting done.
blacklist =
isolationPolicy.getExcludedBookiesWithIsolationGroups(2, groups);
assertTrue(blacklist.isEmpty());

Thread.sleep(waitTime);

//waitingCompleteFuture, the future is already done.
blacklist =
isolationPolicy.getExcludedBookiesWithIsolationGroups(2, groups);
assertFalse(blacklist.isEmpty());
assertEquals(blacklist.size(), 1);
BookieId excludeBookie = blacklist.iterator().next();
assertEquals(excludeBookie.toString(), BOOKIE3);

//emptyFuture, the future is waiting done.
blacklist =
isolationPolicy.getExcludedBookiesWithIsolationGroups(2, groups);
assertFalse(blacklist.isEmpty());
assertEquals(blacklist.size(), 1);
excludeBookie = blacklist.iterator().next();
assertEquals(excludeBookie.toString(), BOOKIE3);

Thread.sleep(longWaitTime - waitTime);

//emptyFuture, the future is already done.
blacklist =
isolationPolicy.getExcludedBookiesWithIsolationGroups(2, groups);
assertTrue(blacklist.isEmpty());
}

@Test
public void testBasic() throws Exception {
Map<String, Map<String, BookieInfo>> bookieMapping = new HashMap<>();
Expand Down
Loading