diff --git a/conf/standalone.conf b/conf/standalone.conf index 12be0e64109a85..5c94d63817a12f 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -832,7 +832,7 @@ managedLedgerTraceTaskExecution=true ### --- Load balancer --- ### -loadManagerClassName=org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl +loadManagerClassName=org.apache.pulsar.broker.loadbalance.NoopLoadManager # Enable load balancer loadBalancerEnabled=false diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 3a5e10373ee8aa..68b38080e73a19 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -293,7 +293,7 @@ public synchronized void start() throws PulsarServerException { log.info("Closed the channel producer."); } } - + PulsarClusterMetadataSetup.createTenantIfAbsent (pulsar.getPulsarResources(), SYSTEM_NAMESPACE.getTenant(), config.getClusterName()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 4492f9c8094356..6b22fddd1d6172 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -824,6 +824,11 @@ public CompletableFuture unloadNamespaceBundle(NamespaceBundle bundle, } public CompletableFuture isNamespaceBundleOwned(NamespaceBundle bundle) { + if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) { + ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get()); + return extensibleLoadManager.getOwnershipAsync(Optional.empty(), bundle) + .thenApply(Optional::isPresent); + } return pulsar.getLocalMetadataStore().exists(ServiceUnitUtils.path(bundle)); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 8146d8560da0f8..b4d0f38b4a4dc5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1789,19 +1789,15 @@ private CompletableFuture checkTopicAlreadyMigrated(TopicName topicName) { return CompletableFuture.completedFuture(null); } CompletableFuture result = new CompletableFuture<>(); - if (ExtensibleLoadManagerImpl.isInternalTopic(topicName.toString())) { - result.complete(null); - } else { - AbstractTopic.isClusterMigrationEnabled(pulsar, topicName.toString()).handle((isMigrated, ex) -> { - if (isMigrated) { - result.completeExceptionally( - new BrokerServiceException.TopicMigratedException(topicName + " already migrated")); - } else { - result.complete(null); - } - return null; - }); - } + AbstractTopic.isClusterMigrationEnabled(pulsar, topicName.toString()).handle((isMigrated, ex) -> { + if (isMigrated) { + result.completeExceptionally( + new BrokerServiceException.TopicMigratedException(topicName + " already migrated")); + } else { + result.complete(null); + } + return null; + }); return result; } @@ -2081,12 +2077,7 @@ public void checkGC() { } public void checkClusterMigration() { - topics.forEach((n, t) -> { - Optional topic = extractTopic(t); - if (topic.isPresent() && !ExtensibleLoadManagerImpl.isInternalTopic(topic.get().getName())) { - topic.ifPresent(Topic::checkClusterMigration); - } - }); + forEachTopic(Topic::checkClusterMigration); } public void checkMessageExpiry() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 0ac06d6883ff12..9a3a0a7d83d506 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -589,7 +589,8 @@ public CompletableFuture stopReplProducers() { @Override public CompletableFuture checkReplication() { TopicName name = TopicName.get(topic); - if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)) { + if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name) + || ExtensibleLoadManagerImpl.isInternalTopic(topic)) { return CompletableFuture.completedFuture(null); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 3c9ab04d79a0d8..334e84a35df1e2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1704,7 +1704,8 @@ CompletableFuture checkPersistencePolicies() { @Override public CompletableFuture checkReplication() { TopicName name = TopicName.get(topic); - if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)) { + if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name) + || ExtensibleLoadManagerImpl.isInternalTopic(topic)) { return CompletableFuture.completedFuture(null); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index 1050d9f33b4657..d0be7dfad07677 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -2095,10 +2095,11 @@ public void testDeleteTopicPolicyWhenDeleteSystemTopic() throws Exception { startBroker(); String systemTopic = SYSTEM_NAMESPACE.toString() + "/" + "testDeleteTopicPolicyWhenDeleteSystemTopic"; - admin.tenants().createTenant(SYSTEM_NAMESPACE.getTenant(), + + updateTenant(SYSTEM_NAMESPACE.getTenant(), new TenantInfoImpl(Set.of("role1", "role2"), Set.of("use", "usc", "usw"))); - admin.namespaces().createNamespace(SYSTEM_NAMESPACE.toString()); + setupSystemNamespace(); @Cleanup Producer producer = pulsarClient.newProducer(Schema.STRING) .topic(systemTopic).create(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index 416ef0577ccb03..32812ab093cbc8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -563,6 +563,14 @@ protected void setupSystemNamespace() throws Exception { } } + protected void updateTenant(String tenant, TenantInfoImpl tenantInfo) throws Exception { + if (!admin.tenants().getTenants().contains(tenant)) { + admin.tenants().createTenant(tenant, tenantInfo); + } else { + admin.tenants().updateTenant(tenant, tenantInfo); + } + } + protected Object asyncRequests(Consumer function) throws Exception { TestAsyncResponse ctx = new TestAsyncResponse(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadReportNetworkLimitTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadReportNetworkLimitTest.java index ffa2607b6b42e9..901b52d5d2c61c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadReportNetworkLimitTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadReportNetworkLimitTest.java @@ -22,6 +22,7 @@ import java.util.Optional; import org.apache.commons.lang3.SystemUtils; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -55,8 +56,10 @@ public void cleanup() throws Exception { @Test public void checkLoadReportNicSpeed() throws Exception { + if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) { + return; + } // Since we have overridden the NIC speed in the configuration, the load report for the broker should always - LoadManagerReport report = admin.brokerStats().getLoadReport(); if (SystemUtils.IS_OS_LINUX) { diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java index 57522186c8f169..8707257cecb59d 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java @@ -69,6 +69,13 @@ protected void setup() throws Exception { conf.setAuthenticationProviders(ImmutableSet.of(AuthenticationProviderTls.class.getName())); conf.setNumExecutorThreadPoolSize(5); + conf.setBrokerClientTlsEnabled(true); + conf.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName()); + conf.setBrokerClientAuthenticationParameters( + String.format("tlsCertFile:%s,tlsKeyFile:%s", + getTlsFileForClient("admin.cert"), getTlsFileForClient("admin.key-pk8"))); + conf.setBrokerClientTrustCertsFilePath(CA_CERT_FILE_PATH); + super.internalSetup(); // start proxy service @@ -135,7 +142,7 @@ public void testAuthenticatedProxyAsAdmin() throws Exception { adminAdmin.tenants().createTenant("tenant1", new TenantInfoImpl(ImmutableSet.of("randoUser"), ImmutableSet.of(configClusterName))); - Assert.assertEquals(ImmutableSet.of("tenant1"), adminAdmin.tenants().getTenants()); + Assert.assertTrue(adminAdmin.tenants().getTenants().contains("tenant1")); } }