From d9ebaf5bf6fda44d21ac24cec7dbe208b59dc597 Mon Sep 17 00:00:00 2001 From: ken <1647023764@qq.com> Date: Sat, 7 Oct 2023 20:24:45 +0800 Subject: [PATCH] [fix][broker] rackaware policy is ineffective when delete zk rack info after bkclient initialize (#20944) --- .../BookieRackAffinityMapping.java | 4 ++-- .../BookieRackAffinityMappingTest.java | 17 +++++++++++++++++ 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java index e9e350800b44e..d54ef2a5f4cef 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java @@ -121,8 +121,6 @@ public synchronized void setConf(Configuration conf) { store.registerListener(this::handleUpdates); racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get() .orElseGet(BookiesRackConfiguration::new); - updateRacksWithHost(racksWithHost); - watchAvailableBookies(); for (Map bookieMapping : racksWithHost.values()) { for (String address : bookieMapping.keySet()) { bookieAddressListLastTime.add(BookieId.parse(address)); @@ -132,6 +130,8 @@ public synchronized void setConf(Configuration conf) { bookieAddressListLastTime); } } + updateRacksWithHost(racksWithHost); + watchAvailableBookies(); } catch (InterruptedException | ExecutionException | MetadataException e) { throw new RuntimeException(METADATA_STORE_INSTANCE + " failed to init BookieId list"); } diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java index d7be7dabd0db1..d7df5afb4bebe 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java @@ -254,6 +254,7 @@ public void testWithPulsarRegistrationClient() throws Exception { bkClientConf.getTimeoutTimerNumTicks()); RackawareEnsemblePlacementPolicy repp = new RackawareEnsemblePlacementPolicy(); + mapping.registerRackChangeListener(repp); Class clazz1 = Class.forName("org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy"); Field field1 = clazz1.getDeclaredField("knownBookies"); field1.setAccessible(true); @@ -323,6 +324,22 @@ public void testWithPulsarRegistrationClient() throws Exception { assertEquals(knownBookies.get(BOOKIE2.toBookieId()).getNetworkLocation(), "/rack1"); assertEquals(knownBookies.get(BOOKIE3.toBookieId()).getNetworkLocation(), "/default-rack"); + //remove bookie2 rack, the bookie2 rack should be /default-rack + data = "{\"group1\": {\"" + BOOKIE1 + + "\": {\"rack\": \"/rack0\", \"hostname\": \"bookie1.example.com\"}}}"; + store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, data.getBytes(), Optional.empty()).join(); + Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> ((BookiesRackConfiguration)field.get(mapping)).get("group1").size() == 1); + + racks = mapping + .resolve(Lists.newArrayList(BOOKIE1.getHostName(), BOOKIE2.getHostName(), BOOKIE3.getHostName())) + .stream().filter(Objects::nonNull).toList(); + assertEquals(racks.size(), 1); + assertEquals(racks.get(0), "/rack0"); + assertEquals(knownBookies.size(), 3); + assertEquals(knownBookies.get(BOOKIE1.toBookieId()).getNetworkLocation(), "/rack0"); + assertEquals(knownBookies.get(BOOKIE2.toBookieId()).getNetworkLocation(), "/default-rack"); + assertEquals(knownBookies.get(BOOKIE3.toBookieId()).getNetworkLocation(), "/default-rack"); + timer.stop(); } }