From 2f435cc31d695f043b1915cd79bfc1254c0fedbb Mon Sep 17 00:00:00 2001 From: horizonzy Date: Thu, 31 Aug 2023 01:02:54 +0800 Subject: [PATCH 1/7] Fix isolated group not work problem. --- ...IsolatedBookieEnsemblePlacementPolicy.java | 25 +++++++++++-------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java index 164677ca9c663..f15b0f65cc86f 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java @@ -27,7 +27,8 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException; import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy; @@ -47,6 +48,9 @@ import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig; import org.apache.pulsar.metadata.api.MetadataCache; import org.apache.pulsar.metadata.api.MetadataStore; +import org.apache.pulsar.metadata.bookkeeper.AbstractMetadataDriver; +import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping; +import org.apache.zookeeper.KeeperException; @Slf4j public class IsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePlacementPolicy { @@ -182,22 +186,19 @@ private static Pair, Set> getIsolationGroup( private Set getExcludedBookiesWithIsolationGroups(int ensembleSize, Pair, Set> isolationGroups) { Set excludedBookies = new HashSet<>(); - if (isolationGroups != null && isolationGroups.getLeft().contains(PULSAR_SYSTEM_TOPIC_ISOLATION_GROUP)) { + if (isolationGroups != null && isolationGroups.getLeft().contains(PULSAR_SYSTEM_TOPIC_ISOLATION_GROUP)) { return excludedBookies; } try { if (bookieMappingCache != null) { - CompletableFuture> future = - bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH); + Optional optional = + bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH).get( + AbstractMetadataDriver.BLOCKING_CALL_TIMEOUT, TimeUnit.MILLISECONDS); - Optional optRes = (future.isDone() && !future.isCompletedExceptionally()) - ? future.join() : Optional.empty(); - - if (optRes.isEmpty()) { - return excludedBookies; + if (!optional.isPresent()) { + throw new KeeperException.NoNodeException(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH); } - - BookiesRackConfiguration allGroupsBookieMapping = optRes.get(); + BookiesRackConfiguration allGroupsBookieMapping = optional.get(); Set allBookies = allGroupsBookieMapping.keySet(); int totalAvailableBookiesInPrimaryGroup = 0; Set primaryIsolationGroup = Collections.emptySet(); @@ -266,6 +267,8 @@ private Set getExcludedBookiesWithIsolationGroups(int ensembleSize, } } } + } catch (TimeoutException e) { + log.warn("Getting bookie isolation info from metadata store timeout."); } catch (Exception e) { log.warn("Error getting bookie isolation info from metadata store: {}", e.getMessage()); } From 1f2126a0cf5450c2d41199c0901214a5016281f2 Mon Sep 17 00:00:00 2001 From: horizonzy Date: Thu, 31 Aug 2023 11:49:42 +0800 Subject: [PATCH 2/7] add tests. --- ...IsolatedBookieEnsemblePlacementPolicy.java | 9 ++- ...atedBookieEnsemblePlacementPolicyTest.java | 73 +++++++++++++++++++ 2 files changed, 80 insertions(+), 2 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java index f15b0f65cc86f..83b08868c53a8 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java @@ -19,6 +19,7 @@ package org.apache.pulsar.bookie.rackawareness; import static org.apache.pulsar.bookie.rackawareness.BookieRackAffinityMapping.METADATA_STORE_INSTANCE; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Sets; import io.netty.util.HashedWheelTimer; import java.util.Collections; @@ -63,6 +64,9 @@ public class IsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePlac private MetadataCache bookieMappingCache; + @VisibleForTesting + long metaOpTimeout = AbstractMetadataDriver.BLOCKING_CALL_TIMEOUT; + private static final String PULSAR_SYSTEM_TOPIC_ISOLATION_GROUP = "*"; @@ -183,7 +187,8 @@ private static Pair, Set> getIsolationGroup( return pair; } - private Set getExcludedBookiesWithIsolationGroups(int ensembleSize, + @VisibleForTesting + Set getExcludedBookiesWithIsolationGroups(int ensembleSize, Pair, Set> isolationGroups) { Set excludedBookies = new HashSet<>(); if (isolationGroups != null && isolationGroups.getLeft().contains(PULSAR_SYSTEM_TOPIC_ISOLATION_GROUP)) { @@ -193,7 +198,7 @@ private Set getExcludedBookiesWithIsolationGroups(int ensembleSize, if (bookieMappingCache != null) { Optional optional = bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH).get( - AbstractMetadataDriver.BLOCKING_CALL_TIMEOUT, TimeUnit.MILLISECONDS); + metaOpTimeout, TimeUnit.MILLISECONDS); if (!optional.isPresent()) { throw new KeeperException.NoNodeException(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH); diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java index f535ced08f731..856d70fb03f56 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java @@ -18,11 +18,14 @@ */ package org.apache.pulsar.bookie.rackawareness; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Sets; import io.netty.util.HashedWheelTimer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -34,18 +37,23 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.feature.SettableFeatureProvider; import org.apache.bookkeeper.net.BookieId; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.stats.NullStatsLogger; +import org.apache.commons.lang3.tuple.MutablePair; import org.apache.pulsar.common.policies.data.BookieInfo; +import org.apache.pulsar.common.policies.data.BookiesRackConfiguration; import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.MetadataStoreConfig; import org.apache.pulsar.metadata.api.MetadataStoreFactory; +import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; +import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -114,6 +122,71 @@ public void testNonRegionBookie() throws Exception { assertFalse(ensemble.contains(new BookieSocketAddress(BOOKIE4).toBookieId())); } + @Test + public void testMetadataStoreCases() { + Map mainBookieGroup = new HashMap<>(); + mainBookieGroup.put(BOOKIE1, BookieInfo.builder().rack("rack0").build()); + mainBookieGroup.put(BOOKIE2, BookieInfo.builder().rack("rack1").build()); + + Map secondaryBookieGroup = new HashMap<>(); + secondaryBookieGroup.put(BOOKIE3, BookieInfo.builder().rack("rack0").build()); + + store = mock(MetadataStoreExtended.class); + MetadataCacheImpl cache = mock(MetadataCacheImpl.class); + when(store.getMetadataCache(BookiesRackConfiguration.class)).thenReturn(cache); + CompletableFuture completableFuture = CompletableFuture.completedFuture(null); + long metaOpTimeout = 3000; + CompletableFuture> waitingCompleteFuture = new CompletableFuture<>(); + new Thread(() -> { + try { + Thread.sleep(metaOpTimeout - 1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + BookiesRackConfiguration rackConfiguration = new BookiesRackConfiguration(); + rackConfiguration.put("group1", mainBookieGroup); + rackConfiguration.put("group2", secondaryBookieGroup); + waitingCompleteFuture.complete(Optional.of(rackConfiguration)); + }).start(); + + CompletableFuture> timeoutFuture = new CompletableFuture<>(); + new Thread(() -> { + try { + Thread.sleep(metaOpTimeout + 1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + BookiesRackConfiguration rackConfiguration = new BookiesRackConfiguration(); + rackConfiguration.put("group1", mainBookieGroup); + rackConfiguration.put("group2", secondaryBookieGroup); + waitingCompleteFuture.complete(Optional.of(rackConfiguration)); + }).start(); + + + when(cache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH)).thenReturn(completableFuture) + .thenReturn(waitingCompleteFuture).thenReturn(timeoutFuture); + + IsolatedBookieEnsemblePlacementPolicy isolationPolicy = new IsolatedBookieEnsemblePlacementPolicy(); + isolationPolicy.metaOpTimeout = metaOpTimeout; + ClientConfiguration bkClientConf = new ClientConfiguration(); + bkClientConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE, store); + bkClientConf.setProperty(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, isolationGroups); + isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, SettableFeatureProvider.DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); + isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies); + + MutablePair, Set> groups = new MutablePair<>(); + groups.setLeft(Sets.newHashSet("group1")); + groups.setRight(new HashSet<>()); + + Set blacklist = + isolationPolicy.getExcludedBookiesWithIsolationGroups(2, groups); + assertFalse(blacklist.isEmpty()); + + blacklist = + isolationPolicy.getExcludedBookiesWithIsolationGroups(2, groups); + assertTrue(blacklist.isEmpty()); + } + @Test public void testBasic() throws Exception { Map> bookieMapping = new HashMap<>(); From 478a1db351a9d52e1950b2fe472bbb844af34cdf Mon Sep 17 00:00:00 2001 From: horizonzy Date: Fri, 1 Sep 2023 00:43:55 +0800 Subject: [PATCH 3/7] tune code. --- .../IsolatedBookieEnsemblePlacementPolicyTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java index 856d70fb03f56..0508fcb6ceb02 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java @@ -152,14 +152,14 @@ public void testMetadataStoreCases() { CompletableFuture> timeoutFuture = new CompletableFuture<>(); new Thread(() -> { try { - Thread.sleep(metaOpTimeout + 1000); + Thread.sleep(metaOpTimeout + 5000); } catch (InterruptedException e) { throw new RuntimeException(e); } BookiesRackConfiguration rackConfiguration = new BookiesRackConfiguration(); rackConfiguration.put("group1", mainBookieGroup); rackConfiguration.put("group2", secondaryBookieGroup); - waitingCompleteFuture.complete(Optional.of(rackConfiguration)); + timeoutFuture.complete(Optional.of(rackConfiguration)); }).start(); From ec10843baa43c706bc01fb9d2c3730a9ac95d4d5 Mon Sep 17 00:00:00 2001 From: horizonzy Date: Fri, 1 Sep 2023 23:43:14 +0800 Subject: [PATCH 4/7] cache the RackConfiguration to avoid sync operation. --- ...IsolatedBookieEnsemblePlacementPolicy.java | 37 +++++++------ ...atedBookieEnsemblePlacementPolicyTest.java | 52 +++++++++---------- 2 files changed, 47 insertions(+), 42 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java index 83b08868c53a8..7448d1418777c 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java @@ -28,8 +28,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException; import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy; @@ -49,7 +48,6 @@ import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig; import org.apache.pulsar.metadata.api.MetadataCache; import org.apache.pulsar.metadata.api.MetadataStore; -import org.apache.pulsar.metadata.bookkeeper.AbstractMetadataDriver; import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping; import org.apache.zookeeper.KeeperException; @@ -64,11 +62,9 @@ public class IsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePlac private MetadataCache bookieMappingCache; - @VisibleForTesting - long metaOpTimeout = AbstractMetadataDriver.BLOCKING_CALL_TIMEOUT; - private static final String PULSAR_SYSTEM_TOPIC_ISOLATION_GROUP = "*"; + private BookiesRackConfiguration cachedRackConfiguration = null; public IsolatedBookieEnsemblePlacementPolicy() { super(); @@ -94,7 +90,9 @@ public RackawareEnsemblePlacementPolicyImpl initialize(ClientConfiguration conf, } // Only add the bookieMappingCache if we have defined an isolation group bookieMappingCache = store.getMetadataCache(BookiesRackConfiguration.class); - bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH).join(); + Optional optional = + bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH).join(); + optional.ifPresent(bookiesRackConfiguration -> cachedRackConfiguration = bookiesRackConfiguration); } if (conf.getProperty(SECONDARY_ISOLATION_BOOKIE_GROUPS) != null) { String secondaryIsolationGroupsString = ConfigurationStringUtil @@ -196,14 +194,25 @@ Set getExcludedBookiesWithIsolationGroups(int ensembleSize, } try { if (bookieMappingCache != null) { - Optional optional = - bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH).get( - metaOpTimeout, TimeUnit.MILLISECONDS); + CompletableFuture> future = + bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH); + + BookiesRackConfiguration allGroupsBookieMapping; + Optional optRes = (future.isDone() && !future.isCompletedExceptionally()) + ? future.join() : Optional.empty(); - if (!optional.isPresent()) { - throw new KeeperException.NoNodeException(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH); + if (!optRes.isPresent()) { + if (cachedRackConfiguration != null) { + log.debug("The newest rack config is not available now, use the cached rack config : {}", + cachedRackConfiguration); + allGroupsBookieMapping = cachedRackConfiguration; + } else { + throw new KeeperException.NoNodeException(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH); + } + } else { + cachedRackConfiguration = optRes.get(); + allGroupsBookieMapping = optRes.get(); } - BookiesRackConfiguration allGroupsBookieMapping = optional.get(); Set allBookies = allGroupsBookieMapping.keySet(); int totalAvailableBookiesInPrimaryGroup = 0; Set primaryIsolationGroup = Collections.emptySet(); @@ -272,8 +281,6 @@ Set getExcludedBookiesWithIsolationGroups(int ensembleSize, } } } - } catch (TimeoutException e) { - log.warn("Getting bookie isolation info from metadata store timeout."); } catch (Exception e) { log.warn("Error getting bookie isolation info from metadata store: {}", e.getMessage()); } diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java index 0508fcb6ceb02..6ebbc8b4590c1 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java @@ -123,7 +123,7 @@ public void testNonRegionBookie() throws Exception { } @Test - public void testMetadataStoreCases() { + public void testMetadataStoreCases() throws Exception { Map mainBookieGroup = new HashMap<>(); mainBookieGroup.put(BOOKIE1, BookieInfo.builder().rack("rack0").build()); mainBookieGroup.put(BOOKIE2, BookieInfo.builder().rack("rack1").build()); @@ -134,40 +134,31 @@ public void testMetadataStoreCases() { store = mock(MetadataStoreExtended.class); MetadataCacheImpl cache = mock(MetadataCacheImpl.class); when(store.getMetadataCache(BookiesRackConfiguration.class)).thenReturn(cache); - CompletableFuture completableFuture = CompletableFuture.completedFuture(null); - long metaOpTimeout = 3000; - CompletableFuture> waitingCompleteFuture = new CompletableFuture<>(); - new Thread(() -> { - try { - Thread.sleep(metaOpTimeout - 1000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - BookiesRackConfiguration rackConfiguration = new BookiesRackConfiguration(); - rackConfiguration.put("group1", mainBookieGroup); - rackConfiguration.put("group2", secondaryBookieGroup); - waitingCompleteFuture.complete(Optional.of(rackConfiguration)); - }).start(); + CompletableFuture> initialFuture = new CompletableFuture<>(); + //The initialFuture only has group1. + BookiesRackConfiguration rackConfiguration1 = new BookiesRackConfiguration(); + rackConfiguration1.put("group1", mainBookieGroup); + initialFuture.complete(Optional.of(rackConfiguration1)); - CompletableFuture> timeoutFuture = new CompletableFuture<>(); + long waitTime = 2000; + CompletableFuture> waitingCompleteFuture = new CompletableFuture<>(); new Thread(() -> { try { - Thread.sleep(metaOpTimeout + 5000); + Thread.sleep(waitTime); } catch (InterruptedException e) { throw new RuntimeException(e); } - BookiesRackConfiguration rackConfiguration = new BookiesRackConfiguration(); - rackConfiguration.put("group1", mainBookieGroup); - rackConfiguration.put("group2", secondaryBookieGroup); - timeoutFuture.complete(Optional.of(rackConfiguration)); + //The waitingCompleteFuture has group1 and group2. + BookiesRackConfiguration rackConfiguration2 = new BookiesRackConfiguration(); + rackConfiguration2.put("group1", mainBookieGroup); + rackConfiguration2.put("group2", secondaryBookieGroup); + waitingCompleteFuture.complete(Optional.of(rackConfiguration2)); }).start(); - - when(cache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH)).thenReturn(completableFuture) - .thenReturn(waitingCompleteFuture).thenReturn(timeoutFuture); + when(cache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH)).thenReturn(initialFuture) + .thenReturn(waitingCompleteFuture); IsolatedBookieEnsemblePlacementPolicy isolationPolicy = new IsolatedBookieEnsemblePlacementPolicy(); - isolationPolicy.metaOpTimeout = metaOpTimeout; ClientConfiguration bkClientConf = new ClientConfiguration(); bkClientConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE, store); bkClientConf.setProperty(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, isolationGroups); @@ -178,13 +169,20 @@ public void testMetadataStoreCases() { groups.setLeft(Sets.newHashSet("group1")); groups.setRight(new HashSet<>()); + //The future is waiting done, so use the cached rack config. Set blacklist = isolationPolicy.getExcludedBookiesWithIsolationGroups(2, groups); - assertFalse(blacklist.isEmpty()); + assertTrue(blacklist.isEmpty()); + Thread.sleep(waitTime); + + //The future is already done, use the newest rack config. blacklist = isolationPolicy.getExcludedBookiesWithIsolationGroups(2, groups); - assertTrue(blacklist.isEmpty()); + assertFalse(blacklist.isEmpty()); + assertEquals(blacklist.size(), 1); + BookieId excludeBookie = blacklist.iterator().next(); + assertEquals(excludeBookie.toString(), BOOKIE3); } @Test From 301cb9026561e84b3cb7b64ef357719220333b2a Mon Sep 17 00:00:00 2001 From: horizonzy Date: Mon, 4 Sep 2023 18:53:22 +0800 Subject: [PATCH 5/7] Address the comments. --- ...IsolatedBookieEnsemblePlacementPolicy.java | 37 +++++++--------- ...atedBookieEnsemblePlacementPolicyTest.java | 43 +++++++++++++++++-- 2 files changed, 55 insertions(+), 25 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java index 7448d1418777c..a893069e3ec2d 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java @@ -64,7 +64,7 @@ public class IsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePlac private static final String PULSAR_SYSTEM_TOPIC_ISOLATION_GROUP = "*"; - private BookiesRackConfiguration cachedRackConfiguration = null; + private volatile BookiesRackConfiguration cachedRackConfiguration = null; public IsolatedBookieEnsemblePlacementPolicy() { super(); @@ -90,9 +90,12 @@ public RackawareEnsemblePlacementPolicyImpl initialize(ClientConfiguration conf, } // Only add the bookieMappingCache if we have defined an isolation group bookieMappingCache = store.getMetadataCache(BookiesRackConfiguration.class); - Optional optional = - bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH).join(); - optional.ifPresent(bookiesRackConfiguration -> cachedRackConfiguration = bookiesRackConfiguration); + bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH).thenAccept(opt -> opt.ifPresent( + bookiesRackConfiguration -> cachedRackConfiguration = bookiesRackConfiguration)) + .exceptionally(e -> { + log.warn("Failed to load bookies rack configuration while initialize the PlacementPolicy."); + return null; + }); } if (conf.getProperty(SECONDARY_ISOLATION_BOOKIE_GROUPS) != null) { String secondaryIsolationGroupsString = ConfigurationStringUtil @@ -194,24 +197,16 @@ Set getExcludedBookiesWithIsolationGroups(int ensembleSize, } try { if (bookieMappingCache != null) { - CompletableFuture> future = - bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH); + bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH) + .thenAccept(opt -> cachedRackConfiguration = opt.orElse(null)).exceptionally(e -> { + log.warn("Failed to update the newest bookies rack config."); + return null; + }); - BookiesRackConfiguration allGroupsBookieMapping; - Optional optRes = (future.isDone() && !future.isCompletedExceptionally()) - ? future.join() : Optional.empty(); - - if (!optRes.isPresent()) { - if (cachedRackConfiguration != null) { - log.debug("The newest rack config is not available now, use the cached rack config : {}", - cachedRackConfiguration); - allGroupsBookieMapping = cachedRackConfiguration; - } else { - throw new KeeperException.NoNodeException(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH); - } - } else { - cachedRackConfiguration = optRes.get(); - allGroupsBookieMapping = optRes.get(); + BookiesRackConfiguration allGroupsBookieMapping = cachedRackConfiguration; + if (allGroupsBookieMapping == null) { + log.debug("The bookies rack config is not available at now."); + return excludedBookies; } Set allBookies = allGroupsBookieMapping.keySet(); int totalAvailableBookiesInPrimaryGroup = 0; diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java index 6ebbc8b4590c1..28cc6e3e05f3a 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java @@ -155,8 +155,23 @@ public void testMetadataStoreCases() throws Exception { waitingCompleteFuture.complete(Optional.of(rackConfiguration2)); }).start(); - when(cache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH)).thenReturn(initialFuture) - .thenReturn(waitingCompleteFuture); + long longWaitTime = 4000; + CompletableFuture> emptyFuture = new CompletableFuture<>(); + new Thread(() -> { + try { + Thread.sleep(longWaitTime); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + //The emptyFuture means that the zk node /bookies already be removed. + emptyFuture.complete(Optional.empty()); + }).start(); + + //Return different future means that cache expire. + when(cache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH)) + .thenReturn(initialFuture).thenReturn(initialFuture) + .thenReturn(waitingCompleteFuture).thenReturn(waitingCompleteFuture) + .thenReturn(emptyFuture).thenReturn(emptyFuture); IsolatedBookieEnsemblePlacementPolicy isolationPolicy = new IsolatedBookieEnsemblePlacementPolicy(); ClientConfiguration bkClientConf = new ClientConfiguration(); @@ -169,20 +184,40 @@ public void testMetadataStoreCases() throws Exception { groups.setLeft(Sets.newHashSet("group1")); groups.setRight(new HashSet<>()); - //The future is waiting done, so use the cached rack config. + //initialFuture, the future is waiting done. Set blacklist = isolationPolicy.getExcludedBookiesWithIsolationGroups(2, groups); assertTrue(blacklist.isEmpty()); + //waitingCompleteFuture, the future is waiting done. + blacklist = + isolationPolicy.getExcludedBookiesWithIsolationGroups(2, groups); + assertTrue(blacklist.isEmpty()); + Thread.sleep(waitTime); - //The future is already done, use the newest rack config. + //waitingCompleteFuture, the future is already done. blacklist = isolationPolicy.getExcludedBookiesWithIsolationGroups(2, groups); assertFalse(blacklist.isEmpty()); assertEquals(blacklist.size(), 1); BookieId excludeBookie = blacklist.iterator().next(); assertEquals(excludeBookie.toString(), BOOKIE3); + + //emptyFuture, the future is waiting done. + blacklist = + isolationPolicy.getExcludedBookiesWithIsolationGroups(2, groups); + assertFalse(blacklist.isEmpty()); + assertEquals(blacklist.size(), 1); + excludeBookie = blacklist.iterator().next(); + assertEquals(excludeBookie.toString(), BOOKIE3); + + Thread.sleep(longWaitTime - waitTime); + + //emptyFuture, the future is already done. + blacklist = + isolationPolicy.getExcludedBookiesWithIsolationGroups(2, groups); + assertTrue(blacklist.isEmpty()); } @Test From c79568eb75671e9890e7f01d1505407c1e04fb07 Mon Sep 17 00:00:00 2001 From: horizonzy Date: Mon, 4 Sep 2023 19:07:47 +0800 Subject: [PATCH 6/7] Fix checkstyle. --- .../rackawareness/IsolatedBookieEnsemblePlacementPolicy.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java index a893069e3ec2d..02ddea9487469 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java @@ -28,7 +28,6 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException; import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy; @@ -48,8 +47,6 @@ import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig; import org.apache.pulsar.metadata.api.MetadataCache; import org.apache.pulsar.metadata.api.MetadataStore; -import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping; -import org.apache.zookeeper.KeeperException; @Slf4j public class IsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePlacementPolicy { From c033ac088d530f513daff43c4e7753df0d6b27e9 Mon Sep 17 00:00:00 2001 From: horizonzy Date: Tue, 5 Sep 2023 01:36:02 +0800 Subject: [PATCH 7/7] Fix ci. --- ...IsolatedBookieEnsemblePlacementPolicyTest.java | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java index 28cc6e3e05f3a..beb00197e4e9a 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java @@ -127,9 +127,10 @@ public void testMetadataStoreCases() throws Exception { Map mainBookieGroup = new HashMap<>(); mainBookieGroup.put(BOOKIE1, BookieInfo.builder().rack("rack0").build()); mainBookieGroup.put(BOOKIE2, BookieInfo.builder().rack("rack1").build()); + mainBookieGroup.put(BOOKIE3, BookieInfo.builder().rack("rack1").build()); + mainBookieGroup.put(BOOKIE4, BookieInfo.builder().rack("rack0").build()); Map secondaryBookieGroup = new HashMap<>(); - secondaryBookieGroup.put(BOOKIE3, BookieInfo.builder().rack("rack0").build()); store = mock(MetadataStoreExtended.class); MetadataCacheImpl cache = mock(MetadataCacheImpl.class); @@ -138,6 +139,7 @@ public void testMetadataStoreCases() throws Exception { //The initialFuture only has group1. BookiesRackConfiguration rackConfiguration1 = new BookiesRackConfiguration(); rackConfiguration1.put("group1", mainBookieGroup); + rackConfiguration1.put("group2", secondaryBookieGroup); initialFuture.complete(Optional.of(rackConfiguration1)); long waitTime = 2000; @@ -150,8 +152,15 @@ public void testMetadataStoreCases() throws Exception { } //The waitingCompleteFuture has group1 and group2. BookiesRackConfiguration rackConfiguration2 = new BookiesRackConfiguration(); - rackConfiguration2.put("group1", mainBookieGroup); - rackConfiguration2.put("group2", secondaryBookieGroup); + Map mainBookieGroup2 = new HashMap<>(); + mainBookieGroup2.put(BOOKIE1, BookieInfo.builder().rack("rack0").build()); + mainBookieGroup2.put(BOOKIE2, BookieInfo.builder().rack("rack1").build()); + mainBookieGroup2.put(BOOKIE4, BookieInfo.builder().rack("rack0").build()); + + Map secondaryBookieGroup2 = new HashMap<>(); + secondaryBookieGroup2.put(BOOKIE3, BookieInfo.builder().rack("rack0").build()); + rackConfiguration2.put("group1", mainBookieGroup2); + rackConfiguration2.put("group2", secondaryBookieGroup2); waitingCompleteFuture.complete(Optional.of(rackConfiguration2)); }).start();