diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 581183cf95ad32..31816ead650199 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -887,19 +887,40 @@ private void monitor() { // Monitor role // Periodically check the role in case ZK watcher fails. var isChannelOwner = serviceUnitStateChannel.isChannelOwner(); + if (isChannelOwner) { if (role != Leader) { log.warn("Current role:{} does not match with the channel ownership:{}. " + "Playing the leader role.", role, isChannelOwner); playLeader(); + } else { + if (!topBundlesLoadDataStore.isConnected()) { + log.warn("Leader's topBundlesLoadDataStore is disconnected. Restarting it."); + topBundlesLoadDataStore.init(); + } + if (!brokerLoadDataStore.isConnected()) { + log.warn("Leader's brokerLoadDataStore is disconnected. Restarting it."); + brokerLoadDataStore.init(); + } } } else { if (role != Follower) { log.warn("Current role:{} does not match with the channel ownership:{}. " + "Playing the follower role.", role, isChannelOwner); playFollower(); + } else { + if (!topBundlesLoadDataStore.isConnected()) { + log.warn("Follower's topBundlesLoadDataStore is disconnected. Restarting it."); + topBundlesLoadDataStore.close(); + topBundlesLoadDataStore.startProducer(); + } + if (!brokerLoadDataStore.isConnected()) { + log.warn("Follower's brokerLoadDataStore is disconnected. Restarting it."); + brokerLoadDataStore.init(); + } } } + } catch (Throwable e) { log.error("Failed to get the channel ownership.", e); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java index a7deeeaad8a5cf..cdb91614e234b1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java @@ -103,4 +103,9 @@ public interface LoadDataStore extends Closeable { */ void startProducer() throws LoadDataStoreException; + /** + * Check if this store is connected. + */ + boolean isConnected(); + } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java index ead0a7081fd375..5e811cac47d74b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java @@ -128,6 +128,17 @@ public void startProducer() throws LoadDataStoreException { } } + @Override + public boolean isConnected() { + if (producer != null) { + return producer.isConnected(); + } + + // TODO: Currently, table view does not expose isConnected. + // Consider adding tableview.isConnected() in the future + return tableView != null; + } + @Override public void close() throws IOException { if (producer != null) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java index a120ef473e9a5e..1dc4a121cdc918 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java @@ -109,6 +109,11 @@ public void startTableView() throws LoadDataStoreException { public void startProducer() throws LoadDataStoreException { } + + @Override + public boolean isConnected() { + return true; + } }; configuration.setPreferLaterVersions(true); doReturn(configuration).when(mockContext).brokerConfiguration(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java index 4eec6124777582..440abf766e8d6a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java @@ -402,6 +402,11 @@ public void startTableView() throws LoadDataStoreException { public void startProducer() throws LoadDataStoreException { } + + @Override + public boolean isConnected() { + return true; + } }; var topBundleLoadDataStore = new LoadDataStore() { @@ -470,6 +475,11 @@ public void startTableView() throws LoadDataStoreException { public void startProducer() throws LoadDataStoreException { } + + @Override + public boolean isConnected() { + return true; + } }; BrokerRegistry brokerRegistry = mock(BrokerRegistry.class); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java index b1e09bf2f3afb2..343c54b0085fb1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java @@ -271,6 +271,11 @@ public void startTableView() throws LoadDataStoreException { public void startProducer() throws LoadDataStoreException { } + + @Override + public boolean isConnected() { + return true; + } }; doReturn(conf).when(ctx).brokerConfiguration(); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java index bbb92f6a6cf4ce..945df93ec968fb 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java @@ -266,9 +266,11 @@ public void testStopBroker() throws Exception { } } - String broker1 = admin.lookups().lookupTopic(topicName); + Awaitility.waitAtMost(5, TimeUnit.SECONDS).untilAsserted(() -> { + String broker1 = admin.lookups().lookupTopic(topicName); + assertNotEquals(broker1, broker); + }); - assertNotEquals(broker1, broker); } @Test(timeOut = 40 * 1000) @@ -308,7 +310,7 @@ public void testAntiaffinityPolicy() throws PulsarAdminException { assertEquals(result.size(), NUM_BROKERS); } - @Test(timeOut = 40 * 1000) + @Test(timeOut = 40 * 1000, invocationCount = 30) public void testIsolationPolicy() throws Exception { final String namespaceIsolationPolicyName = "my-isolation-policy"; final String isolationEnabledNameSpace = DEFAULT_TENANT + "/my-isolation-policy" + nsSuffix; @@ -351,7 +353,12 @@ public void testIsolationPolicy() throws Exception { //expected when retried } - String broker = admin.lookups().lookupTopic(topic); + Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> { + String broker = admin.lookups().lookupTopic(topic); + // This isolated topic should be assigned to the primary broker, broker-0 + assertEquals(extractBrokerIndex(broker), 0); + }); + for (BrokerContainer container : pulsarCluster.getBrokers()) { String name = container.getHostName(); @@ -360,13 +367,11 @@ public void testIsolationPolicy() throws Exception { } } - assertEquals(extractBrokerIndex(broker), 0); - - broker = admin.lookups().lookupTopic(topic); - - final String brokerName = broker; - retryStrategically((test) -> extractBrokerIndex(brokerName) == 1, 100, 200); - assertEquals(extractBrokerIndex(broker), 1); + Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> { + String broker = admin.lookups().lookupTopic(topic); + // This isolated topic should be assigned to the secondary broker, broker-1 + assertEquals(extractBrokerIndex(broker), 1); + }); for (BrokerContainer container : pulsarCluster.getBrokers()) { String name = container.getHostName(); @@ -374,14 +379,17 @@ public void testIsolationPolicy() throws Exception { container.stop(); } } - try { - admin.lookups().lookupTopic(topic); - fail(); - } catch (Exception ex) { - log.error("Failed to lookup topic: ", ex); - assertThat(ex.getMessage()).containsAnyOf("Failed to look up a broker", - "Failed to select the new owner broker for bundle"); - } + + Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> { + // This isolated topic cannot be assigned to the remaining broker, broker-2 + try { + String broker = admin.lookups().lookupTopic(topic); + log.warn("looked up broker {}. retrying...", broker); + } catch (Exception ex) { + assertThat(ex.getMessage()).containsAnyOf("Failed to look up a broker", + "Failed to select the new owner broker for bundle"); + } + }); } private String getBrokerUrl(int index) {