From 63ea69d300036055c97b9d929dafd79979476fb1 Mon Sep 17 00:00:00 2001 From: Heesung Sohn Date: Tue, 19 Dec 2023 16:03:50 -0800 Subject: [PATCH] [fix][broker] Fixed the ExtensibleLoadManagerImpl internal system topic getTopic failure when the leadership changes --- .../extensions/ExtensibleLoadManagerImpl.java | 112 +++++++++--------- .../extensions/store/LoadDataStore.java | 17 +++ .../store/TableViewLoadDataStoreImpl.java | 28 ++++- .../ExtensibleLoadManagerImplTest.java | 16 +-- .../filter/BrokerFilterTestBase.java | 15 +++ .../scheduler/TransferShedderTest.java | 30 +++++ .../extensions/store/LoadDataStoreTest.java | 3 + .../LeastResourceUsageWithWeightTest.java | 15 +++ .../ExtensibleLoadManagerTest.java | 47 +++++--- 9 files changed, 200 insertions(+), 83 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index ae1a4e606bbe0f..745bece301d23c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -297,13 +297,18 @@ public static void createSystemTopic(PulsarService pulsar, String topic) throws log.info("Created topic {}.", topic); } catch (PulsarAdminException.ConflictException ex) { if (debug(pulsar.getConfiguration(), log)) { - log.info("Topic {} already exists.", topic, ex); + log.info("Topic {} already exists.", topic); } } catch (PulsarAdminException e) { throw new PulsarServerException(e); } } + private static void createSystemTopics(PulsarService pulsar) throws PulsarServerException { + createSystemTopic(pulsar, BROKER_LOAD_DATA_STORE_TOPIC); + createSystemTopic(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC); + } + /** * Gets the assigned broker for the given topic. * @param pulsar PulsarService instance @@ -370,13 +375,9 @@ public void start() throws PulsarServerException { this.isolationPoliciesHelper = new IsolationPoliciesHelper(policies); this.brokerFilterPipeline.add(new BrokerIsolationPoliciesFilter(isolationPoliciesHelper)); - createSystemTopic(pulsar, BROKER_LOAD_DATA_STORE_TOPIC); - createSystemTopic(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC); - try { this.brokerLoadDataStore = LoadDataStoreFactory .create(pulsar.getClient(), BROKER_LOAD_DATA_STORE_TOPIC, BrokerLoadData.class); - this.brokerLoadDataStore.startTableView(); this.topBundlesLoadDataStore = LoadDataStoreFactory .create(pulsar.getClient(), TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class); } catch (LoadDataStoreException e) { @@ -431,7 +432,6 @@ public void start() throws PulsarServerException { this.unloadScheduler = new UnloadScheduler( pulsar, pulsar.getLoadManagerExecutor(), unloadManager, context, serviceUnitStateChannel, unloadCounter, unloadMetrics); - this.unloadScheduler.start(); this.splitScheduler = new SplitScheduler( pulsar, serviceUnitStateChannel, splitManager, splitCounter, splitMetrics, context); this.splitScheduler.start(); @@ -789,74 +789,70 @@ public static boolean isInternalTopic(String topic) { @VisibleForTesting void playLeader() { - if (role != Leader) { - log.info("This broker:{} is changing the role from {} to {}", - pulsar.getLookupServiceAddress(), role, Leader); - int retry = 0; - while (true) { + log.info("This broker:{} is setting the role from {} to {}", + pulsar.getLookupServiceAddress(), role, Leader); + int retry = 0; + while (true) { + try { + initWaiter.await(); + // Confirm the system topics have been created or create them if they do not exist. + // If the leader has changed, the new leader need to reset + // the local brokerService.topics (by this topic creations). + // Otherwise, the system topic existence check will fail on the leader broker. + createSystemTopics(pulsar); + brokerLoadDataStore.init(); + topBundlesLoadDataStore.init(); + unloadScheduler.start(); + serviceUnitStateChannel.scheduleOwnershipMonitor(); + break; + } catch (Throwable e) { + log.error("The broker:{} failed to set the role. Retrying {} th ...", + pulsar.getLookupServiceAddress(), ++retry, e); try { - initWaiter.await(); - serviceUnitStateChannel.scheduleOwnershipMonitor(); - topBundlesLoadDataStore.startTableView(); - unloadScheduler.start(); - break; - } catch (Throwable e) { - log.error("The broker:{} failed to change the role. Retrying {} th ...", - pulsar.getLookupServiceAddress(), ++retry, e); - try { - Thread.sleep(Math.min(retry * 10, MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS)); - } catch (InterruptedException ex) { - log.warn("Interrupted while sleeping."); - } + Thread.sleep(Math.min(retry * 10, MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS)); + } catch (InterruptedException ex) { + log.warn("Interrupted while sleeping."); } } - role = Leader; - log.info("This broker:{} plays the leader now.", pulsar.getLookupServiceAddress()); } + role = Leader; + log.info("This broker:{} plays the leader now.", pulsar.getLookupServiceAddress()); // flush the load data when the leader is elected. - if (brokerLoadDataReporter != null) { - brokerLoadDataReporter.reportAsync(true); - } - if (topBundleLoadDataReporter != null) { - topBundleLoadDataReporter.reportAsync(true); - } + brokerLoadDataReporter.reportAsync(true); + topBundleLoadDataReporter.reportAsync(true); } @VisibleForTesting void playFollower() { - if (role != Follower) { - log.info("This broker:{} is changing the role from {} to {}", - pulsar.getLookupServiceAddress(), role, Follower); - int retry = 0; - while (true) { + log.info("This broker:{} is setting the role from {} to {}", + pulsar.getLookupServiceAddress(), role, Follower); + int retry = 0; + while (true) { + try { + initWaiter.await(); + unloadScheduler.close(); + serviceUnitStateChannel.cancelOwnershipMonitor(); + brokerLoadDataStore.init(); + topBundlesLoadDataStore.close(); + topBundlesLoadDataStore.startProducer(); + break; + } catch (Throwable e) { + log.error("The broker:{} failed to set the role. Retrying {} th ...", + pulsar.getLookupServiceAddress(), ++retry, e); try { - initWaiter.await(); - serviceUnitStateChannel.cancelOwnershipMonitor(); - topBundlesLoadDataStore.closeTableView(); - unloadScheduler.close(); - break; - } catch (Throwable e) { - log.error("The broker:{} failed to change the role. Retrying {} th ...", - pulsar.getLookupServiceAddress(), ++retry, e); - try { - Thread.sleep(Math.min(retry * 10, MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS)); - } catch (InterruptedException ex) { - log.warn("Interrupted while sleeping."); - } + Thread.sleep(Math.min(retry * 10, MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS)); + } catch (InterruptedException ex) { + log.warn("Interrupted while sleeping."); } } - role = Follower; - log.info("This broker:{} plays a follower now.", pulsar.getLookupServiceAddress()); } + role = Follower; + log.info("This broker:{} plays a follower now.", pulsar.getLookupServiceAddress()); // flush the load data when the leader is elected. - if (brokerLoadDataReporter != null) { - brokerLoadDataReporter.reportAsync(true); - } - if (topBundleLoadDataReporter != null) { - topBundleLoadDataReporter.reportAsync(true); - } + brokerLoadDataReporter.reportAsync(true); + topBundleLoadDataReporter.reportAsync(true); } public List getMetrics() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java index 680a36523a214e..a7deeeaad8a5cf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java @@ -81,9 +81,26 @@ public interface LoadDataStore extends Closeable { */ void closeTableView() throws IOException; + + /** + * Starts the data store (both producer and table view). + */ + void start() throws LoadDataStoreException; + + /** + * Inits the data store (close and start the data store). + */ + void init() throws IOException; + /** * Starts the table view. */ void startTableView() throws LoadDataStoreException; + + /** + * Starts the producer. + */ + void startProducer() throws LoadDataStoreException; + } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java index a400163ebf1222..1ba3329f6cc008 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java @@ -39,7 +39,7 @@ public class TableViewLoadDataStoreImpl implements LoadDataStore { private TableView tableView; - private final Producer producer; + private Producer producer; private final PulsarClient client; @@ -50,7 +50,6 @@ public class TableViewLoadDataStoreImpl implements LoadDataStore { public TableViewLoadDataStoreImpl(PulsarClient client, String topic, Class clazz) throws LoadDataStoreException { try { this.client = client; - this.producer = client.newProducer(Schema.JSON(clazz)).topic(topic).create(); this.topic = topic; this.clazz = clazz; } catch (Exception e) { @@ -99,6 +98,12 @@ public void closeTableView() throws IOException { } } + @Override + public void start() throws LoadDataStoreException { + startProducer(); + startTableView(); + } + @Override public void startTableView() throws LoadDataStoreException { if (tableView == null) { @@ -111,14 +116,33 @@ public void startTableView() throws LoadDataStoreException { } } + @Override + public void startProducer() throws LoadDataStoreException { + if (producer == null) { + try { + producer = client.newProducer(Schema.JSON(clazz)).topic(topic).create(); + } catch (PulsarClientException e) { + producer = null; + throw new LoadDataStoreException(e); + } + } + } + @Override public void close() throws IOException { if (producer != null) { producer.close(); + producer = null; } closeTableView(); } + @Override + public void init() throws IOException { + close(); + start(); + } + private void validateTableViewStart() { if (tableView == null) { throw new IllegalStateException("table view has not been started"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index d207ecd56ee7b1..bb7416ddc41035 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -1099,12 +1099,12 @@ public void testRoleChange() throws Exception { FieldUtils.writeDeclaredField(secondaryLoadManager, "topBundlesLoadDataStore", topBundlesLoadDataStoreSecondarySpy, true); if (channel1.isChannelOwnerAsync().get(5, TimeUnit.SECONDS)) { - primaryLoadManager.playFollower(); - primaryLoadManager.playFollower(); + primaryLoadManager.playFollower(); // close 3 times + primaryLoadManager.playFollower(); // close 1 time secondaryLoadManager.playLeader(); secondaryLoadManager.playLeader(); - primaryLoadManager.playLeader(); - primaryLoadManager.playLeader(); + primaryLoadManager.playLeader(); // close 3 times and open 3 times + primaryLoadManager.playLeader(); // close 1 time and open 1 time, secondaryLoadManager.playFollower(); secondaryLoadManager.playFollower(); } else { @@ -1119,10 +1119,10 @@ public void testRoleChange() throws Exception { } - verify(topBundlesLoadDataStorePrimarySpy, times(3)).startTableView(); - verify(topBundlesLoadDataStorePrimarySpy, times(3)).closeTableView(); - verify(topBundlesLoadDataStoreSecondarySpy, times(3)).startTableView(); - verify(topBundlesLoadDataStoreSecondarySpy, times(3)).closeTableView(); + verify(topBundlesLoadDataStorePrimarySpy, times(4)).startTableView(); + verify(topBundlesLoadDataStorePrimarySpy, times(8)).closeTableView(); + verify(topBundlesLoadDataStoreSecondarySpy, times(4)).startTableView(); + verify(topBundlesLoadDataStoreSecondarySpy, times(8)).closeTableView(); FieldUtils.writeDeclaredField(primaryLoadManager, "topBundlesLoadDataStore", topBundlesLoadDataStorePrimary, true); FieldUtils.writeDeclaredField(secondaryLoadManager, "topBundlesLoadDataStore", topBundlesLoadDataStoreSecondary, true); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java index 68bd7b29094cd3..a120ef473e9a5e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java @@ -90,10 +90,25 @@ public void closeTableView() throws IOException { } + @Override + public void start() throws LoadDataStoreException { + + } + + @Override + public void init() throws IOException { + + } + @Override public void startTableView() throws LoadDataStoreException { } + + @Override + public void startProducer() throws LoadDataStoreException { + + } }; configuration.setPreferLaterVersions(true); doReturn(configuration).when(mockContext).brokerConfiguration(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java index 26d95a0158d528..4eec6124777582 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java @@ -383,10 +383,25 @@ public void closeTableView() throws IOException { } + @Override + public void start() throws LoadDataStoreException { + + } + + @Override + public void init() throws IOException { + + } + @Override public void startTableView() throws LoadDataStoreException { } + + @Override + public void startProducer() throws LoadDataStoreException { + + } }; var topBundleLoadDataStore = new LoadDataStore() { @@ -436,10 +451,25 @@ public void closeTableView() throws IOException { } + @Override + public void start() throws LoadDataStoreException { + + } + + @Override + public void init() throws IOException { + + } + @Override public void startTableView() throws LoadDataStoreException { } + + @Override + public void startProducer() throws LoadDataStoreException { + + } }; BrokerRegistry brokerRegistry = mock(BrokerRegistry.class); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java index 184c337a47c805..7431b9815f93fb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java @@ -75,6 +75,7 @@ public void testPushGetAndRemove() throws Exception { @Cleanup LoadDataStore loadDataStore = LoadDataStoreFactory.create(pulsar.getClient(), topic, MyClass.class); + loadDataStore.startProducer(); loadDataStore.startTableView(); MyClass myClass1 = new MyClass("1", 1); loadDataStore.pushAsync("key1", myClass1).get(); @@ -108,6 +109,7 @@ public void testForEach() throws Exception { @Cleanup LoadDataStore loadDataStore = LoadDataStoreFactory.create(pulsar.getClient(), topic, Integer.class); + loadDataStore.startProducer(); loadDataStore.startTableView(); Map map = new HashMap<>(); @@ -132,6 +134,7 @@ public void testTableViewRestart() throws Exception { String topic = TopicDomain.persistent + "://" + NamespaceName.SYSTEM_NAMESPACE + "/" + UUID.randomUUID(); LoadDataStore loadDataStore = LoadDataStoreFactory.create(pulsar.getClient(), topic, Integer.class); + loadDataStore.startProducer(); loadDataStore.startTableView(); loadDataStore.pushAsync("1", 1).get(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java index 0eea1d87513bf5..b1e09bf2f3afb2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java @@ -252,10 +252,25 @@ public void closeTableView() throws IOException { } + @Override + public void start() throws LoadDataStoreException { + + } + + @Override + public void init() throws IOException { + + } + @Override public void startTableView() throws LoadDataStoreException { } + + @Override + public void startProducer() throws LoadDataStoreException { + + } }; doReturn(conf).when(ctx).brokerConfiguration(); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java index 158827ca34db6a..37005819c6de0f 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java @@ -308,7 +308,7 @@ public void testAntiaffinityPolicy() throws PulsarAdminException { assertEquals(result.size(), NUM_BROKERS); } - @Test(timeOut = 40 * 1000) + @Test(timeOut = 40 * 1000, invocationCount = 10) public void testIsolationPolicy() throws Exception { final String namespaceIsolationPolicyName = "my-isolation-policy"; final String isolationEnabledNameSpace = DEFAULT_TENANT + "/my-isolation-policy" + nsSuffix; @@ -316,23 +316,40 @@ public void testIsolationPolicy() throws Exception { parameters1.put("min_limit", "1"); parameters1.put("usage_threshold", "100"); - List activeBrokers = admin.brokers().getActiveBrokers(); + Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted( + () -> { + List activeBrokers = admin.brokers().getActiveBrokers(); + assertEquals(activeBrokers.size(), NUM_BROKERS); + } + ); + try { + admin.namespaces().createNamespace(isolationEnabledNameSpace); + } catch (PulsarAdminException.ConflictException e) { + //expected when retried + } - assertEquals(activeBrokers.size(), NUM_BROKERS); + try { + admin.clusters() + .createNamespaceIsolationPolicy(clusterName, namespaceIsolationPolicyName, NamespaceIsolationData + .builder() + .namespaces(List.of(isolationEnabledNameSpace)) + .autoFailoverPolicy(AutoFailoverPolicyData.builder() + .policyType(AutoFailoverPolicyType.min_available) + .parameters(parameters1) + .build()) + .primary(List.of(getHostName(0))) + .secondary(List.of(getHostName(1))) + .build()); + } catch (PulsarAdminException.ConflictException e) { + //expected when retried + } - admin.namespaces().createNamespace(isolationEnabledNameSpace); - admin.clusters().createNamespaceIsolationPolicy(clusterName, namespaceIsolationPolicyName, NamespaceIsolationData - .builder() - .namespaces(List.of(isolationEnabledNameSpace)) - .autoFailoverPolicy(AutoFailoverPolicyData.builder() - .policyType(AutoFailoverPolicyType.min_available) - .parameters(parameters1) - .build()) - .primary(List.of(getHostName(0))) - .secondary(List.of(getHostName(1))) - .build()); final String topic = "persistent://" + isolationEnabledNameSpace + "/topic"; - admin.topics().createNonPartitionedTopic(topic); + try { + admin.topics().createNonPartitionedTopic(topic); + } catch (PulsarAdminException.ConflictException e) { + //expected when retried + } String broker = admin.lookups().lookupTopic(topic);