Skip to content

Commit

Permalink
[fix][broker] rackaware policy is ineffective when delete zk rack inf…
Browse files Browse the repository at this point in the history
…o after bkclient initialize (apache#20944)
  • Loading branch information
TakaHiR07 authored Oct 7, 2023
1 parent dbb1577 commit d9ebaf5
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,6 @@ public synchronized void setConf(Configuration conf) {
store.registerListener(this::handleUpdates);
racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get()
.orElseGet(BookiesRackConfiguration::new);
updateRacksWithHost(racksWithHost);
watchAvailableBookies();
for (Map<String, BookieInfo> bookieMapping : racksWithHost.values()) {
for (String address : bookieMapping.keySet()) {
bookieAddressListLastTime.add(BookieId.parse(address));
Expand All @@ -132,6 +130,8 @@ public synchronized void setConf(Configuration conf) {
bookieAddressListLastTime);
}
}
updateRacksWithHost(racksWithHost);
watchAvailableBookies();
} catch (InterruptedException | ExecutionException | MetadataException e) {
throw new RuntimeException(METADATA_STORE_INSTANCE + " failed to init BookieId list");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ public void testWithPulsarRegistrationClient() throws Exception {
bkClientConf.getTimeoutTimerNumTicks());

RackawareEnsemblePlacementPolicy repp = new RackawareEnsemblePlacementPolicy();
mapping.registerRackChangeListener(repp);
Class<?> clazz1 = Class.forName("org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy");
Field field1 = clazz1.getDeclaredField("knownBookies");
field1.setAccessible(true);
Expand Down Expand Up @@ -323,6 +324,22 @@ public void testWithPulsarRegistrationClient() throws Exception {
assertEquals(knownBookies.get(BOOKIE2.toBookieId()).getNetworkLocation(), "/rack1");
assertEquals(knownBookies.get(BOOKIE3.toBookieId()).getNetworkLocation(), "/default-rack");

//remove bookie2 rack, the bookie2 rack should be /default-rack
data = "{\"group1\": {\"" + BOOKIE1
+ "\": {\"rack\": \"/rack0\", \"hostname\": \"bookie1.example.com\"}}}";
store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, data.getBytes(), Optional.empty()).join();
Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> ((BookiesRackConfiguration)field.get(mapping)).get("group1").size() == 1);

racks = mapping
.resolve(Lists.newArrayList(BOOKIE1.getHostName(), BOOKIE2.getHostName(), BOOKIE3.getHostName()))
.stream().filter(Objects::nonNull).toList();
assertEquals(racks.size(), 1);
assertEquals(racks.get(0), "/rack0");
assertEquals(knownBookies.size(), 3);
assertEquals(knownBookies.get(BOOKIE1.toBookieId()).getNetworkLocation(), "/rack0");
assertEquals(knownBookies.get(BOOKIE2.toBookieId()).getNetworkLocation(), "/default-rack");
assertEquals(knownBookies.get(BOOKIE3.toBookieId()).getNetworkLocation(), "/default-rack");

timer.stop();
}
}

0 comments on commit d9ebaf5

Please sign in to comment.