From 5b56c6584995e84d068db785bd2dbbade44d61ea Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Tue, 5 Nov 2024 11:07:26 +0800 Subject: [PATCH 1/3] Support cleanup replication cluster for all tenants and namespaces when cluster metadata teardown --- .../pulsar/PulsarClusterMetadataTeardown.java | 32 ++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java index 964a49fe10f20..6aa0f2d45ec54 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java @@ -19,6 +19,8 @@ package org.apache.pulsar; import com.google.protobuf.InvalidProtocolBufferException; + +import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; @@ -29,12 +31,18 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; +import org.apache.pulsar.broker.resources.NamespaceResources; +import org.apache.pulsar.broker.resources.PulsarResources; +import org.apache.pulsar.broker.resources.TenantResources; import org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator; +import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.docs.tools.CmdGenerateDocs; import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.MetadataStoreConfig; +import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.MetadataStoreFactory; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; import org.apache.pulsar.metadata.impl.ZKMetadataStore; @@ -153,7 +161,29 @@ public static void main(String[] args) throws Exception { MetadataStoreConfig.builder().sessionTimeoutMillis(arguments.zkSessionTimeoutMillis) .configFilePath(arguments.configurationStoreConfigPath) .metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE).build()); - deleteRecursively(configMetadataStore, "/admin/clusters/" + arguments.cluster).join(); + PulsarResources resources = new PulsarResources(metadataStore, configMetadataStore); + resources.getClusterResources().deleteCluster(arguments.cluster); + // Cleanup replication cluster from all tenants and namespaces + TenantResources tenantResources = resources.getTenantResources(); + NamespaceResources namespaceResources = resources.getNamespaceResources(); + List tenants = tenantResources.listTenants(); + for (String tenant : tenants) { + Optional tenantInfoOptional = tenantResources.getTenant(tenant); + if (tenantInfoOptional.isEmpty()) { + continue; + } + tenantResources.updateTenantAsync(tenant, ti -> { + ti.getAllowedClusters().remove(arguments.cluster); + return ti; + }).get(); + List namespaces = namespaceResources.listNamespacesAsync(tenant).get(); + for (String namespace : namespaces) { + namespaceResources.setPolicies(NamespaceName.get(namespace), policies -> { + policies.replication_clusters.remove(arguments.cluster); + return policies; + }); + } + } } log.info("Cluster metadata for '{}' teardown.", arguments.cluster); From cea283cfcad25415868a1121d70bec4552fd2f71 Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Tue, 5 Nov 2024 14:13:41 +0800 Subject: [PATCH 2/3] Support cleanup `replication cluster` and `allowed cluster` when cluster metadata teardown --- .../pulsar/PulsarClusterMetadataTeardown.java | 26 ++++--- .../ClusterMetadataTeardownTest.java | 74 ++++++++++++++++++- 2 files changed, 86 insertions(+), 14 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java index 6aa0f2d45ec54..9b9cf1baa4247 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java @@ -19,10 +19,10 @@ package org.apache.pulsar; import com.google.protobuf.InvalidProtocolBufferException; - import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import lombok.Cleanup; import org.apache.bookkeeper.client.BKException; @@ -168,27 +168,33 @@ public static void main(String[] args) throws Exception { NamespaceResources namespaceResources = resources.getNamespaceResources(); List tenants = tenantResources.listTenants(); for (String tenant : tenants) { - Optional tenantInfoOptional = tenantResources.getTenant(tenant); - if (tenantInfoOptional.isEmpty()) { - continue; - } - tenantResources.updateTenantAsync(tenant, ti -> { - ti.getAllowedClusters().remove(arguments.cluster); - return ti; - }).get(); List namespaces = namespaceResources.listNamespacesAsync(tenant).get(); for (String namespace : namespaces) { - namespaceResources.setPolicies(NamespaceName.get(namespace), policies -> { + namespaceResources.setPolicies(NamespaceName.get(tenant, namespace), policies -> { policies.replication_clusters.remove(arguments.cluster); return policies; }); } + removeCurrentClusterFromAllowedClusters(tenantResources, tenant, arguments.cluster); } } log.info("Cluster metadata for '{}' teardown.", arguments.cluster); } + private static void removeCurrentClusterFromAllowedClusters( + TenantResources tenantResources, String tenant, String curCluster) + throws MetadataStoreException, InterruptedException, ExecutionException { + Optional tenantInfoOptional = tenantResources.getTenant(tenant); + if (tenantInfoOptional.isEmpty()) { + return; + } + tenantResources.updateTenantAsync(tenant, ti -> { + ti.getAllowedClusters().remove(curCluster); + return ti; + }).get(); + } + private static CompletableFuture deleteRecursively(MetadataStore metadataStore, String path) { return metadataStore.getChildren(path) .thenCompose(children -> FutureUtil.waitForAll( diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataTeardownTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataTeardownTest.java index 5184afade9c85..c689bb60fedf7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataTeardownTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataTeardownTest.java @@ -21,6 +21,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; import java.util.SortedMap; import org.apache.pulsar.PulsarClusterMetadataSetup; import org.apache.pulsar.PulsarClusterMetadataTeardown; @@ -54,7 +55,7 @@ void cleanup() { @Test public void testSetupClusterMetadataAndTeardown() throws Exception { String[] args1 = { - "--cluster", "testReSetupClusterMetadata-cluster", + "--cluster", "cluster1", "--zookeeper", "127.0.0.1:" + localZkS.getZookeeperPort(), "--configuration-store", "127.0.0.1:" + localZkS.getZookeeperPort(), "--configuration-metadata-store-config-path", "src/test/resources/conf/zk_client_enable_sasl.conf", @@ -65,7 +66,7 @@ public void testSetupClusterMetadataAndTeardown() throws Exception { }; PulsarClusterMetadataSetup.main(args1); SortedMap data1 = localZkS.dumpData(); - String clusterDataJson = data1.get("/admin/clusters/testReSetupClusterMetadata-cluster"); + String clusterDataJson = data1.get("/admin/clusters/cluster1"); assertNotNull(clusterDataJson); ClusterData clusterData = ObjectMapperFactory .getMapper() @@ -78,13 +79,78 @@ public void testSetupClusterMetadataAndTeardown() throws Exception { assertFalse(clusterData.isBrokerClientTlsEnabled()); String[] args2 = { - "--cluster", "testReSetupClusterMetadata-cluster", + "--cluster", "cluster1", "--zookeeper", "127.0.0.1:" + localZkS.getZookeeperPort(), "--configuration-store", "127.0.0.1:" + localZkS.getZookeeperPort(), "--configuration-metadata-store-config-path", "src/test/resources/conf/zk_client_enable_sasl.conf", }; PulsarClusterMetadataTeardown.main(args2); SortedMap data2 = localZkS.dumpData(); - assertFalse(data2.containsKey("/admin/clusters/testReSetupClusterMetadata-cluster")); + assertFalse(data2.containsKey("/admin/clusters/cluster1")); + } + + @Test + public void testSetupMultipleClusterMetadataAndTeardown() throws Exception { + String[] cluster1Args = { + "--cluster", "cluster1", + "--zookeeper", "127.0.0.1:" + localZkS.getZookeeperPort(), + "--configuration-store", "127.0.0.1:" + localZkS.getZookeeperPort(), + "--configuration-metadata-store-config-path", "src/test/resources/conf/zk_client_enable_sasl.conf", + "--web-service-url", "http://127.0.0.1:8080", + "--web-service-url-tls", "https://127.0.0.1:8443", + "--broker-service-url", "pulsar://127.0.0.1:6650", + "--broker-service-url-tls", "pulsar+ssl://127.0.0.1:6651" + }; + PulsarClusterMetadataSetup.main(cluster1Args); + String[] cluster2Args = { + "--cluster", "cluster2", + "--zookeeper", "127.0.0.1:" + localZkS.getZookeeperPort(), + "--configuration-store", "127.0.0.1:" + localZkS.getZookeeperPort(), + "--configuration-metadata-store-config-path", "src/test/resources/conf/zk_client_enable_sasl.conf", + "--web-service-url", "http://127.0.0.1:8081", + "--web-service-url-tls", "https://127.0.0.1:8445", + "--broker-service-url", "pulsar://127.0.0.1:6651", + "--broker-service-url-tls", "pulsar+ssl://127.0.0.1:6652" + }; + PulsarClusterMetadataSetup.main(cluster2Args); + SortedMap data1 = localZkS.dumpData(); + String clusterDataJson = data1.get("/admin/clusters/cluster1"); + assertNotNull(clusterDataJson); + ClusterData clusterData = ObjectMapperFactory + .getMapper() + .reader() + .readValue(clusterDataJson, ClusterData.class); + assertEquals(clusterData.getServiceUrl(), "http://127.0.0.1:8080"); + assertEquals(clusterData.getServiceUrlTls(), "https://127.0.0.1:8443"); + assertEquals(clusterData.getBrokerServiceUrl(), "pulsar://127.0.0.1:6650"); + assertEquals(clusterData.getBrokerServiceUrlTls(), "pulsar+ssl://127.0.0.1:6651"); + assertFalse(clusterData.isBrokerClientTlsEnabled()); + + String[] args2 = { + "--cluster", "cluster1", + "--zookeeper", "127.0.0.1:" + localZkS.getZookeeperPort(), + "--configuration-store", "127.0.0.1:" + localZkS.getZookeeperPort(), + "--configuration-metadata-store-config-path", "src/test/resources/conf/zk_client_enable_sasl.conf", + }; + PulsarClusterMetadataTeardown.main(args2); + SortedMap data2 = localZkS.dumpData(); + assertFalse(data2.containsKey("/admin/clusters/cluster1")); + assertTrue(data2.containsKey("/admin/clusters/cluster2")); + + assertTrue(data2.containsKey("/admin/policies/public")); + assertFalse(data2.get("/admin/policies/public").contains("cluster1")); + assertTrue(data2.get("/admin/policies/public").contains("cluster2")); + + assertTrue(data2.containsKey("/admin/policies/pulsar")); + assertFalse(data2.get("/admin/policies/pulsar").contains("cluster1")); + assertTrue(data2.get("/admin/policies/pulsar").contains("cluster2")); + + assertTrue(data2.containsKey("/admin/policies/public/default")); + assertFalse(data2.get("/admin/policies/public/default").contains("cluster1")); + assertTrue(data2.get("/admin/policies/public/default").contains("cluster2")); + + assertTrue(data2.containsKey("/admin/policies/pulsar/system")); + assertFalse(data2.get("/admin/policies/pulsar/system").contains("cluster1")); + assertTrue(data2.get("/admin/policies/pulsar/system").contains("cluster2")); } } From 9519f6981637644c8d3b96441146ed9e6bc64c0f Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Wed, 6 Nov 2024 10:08:08 +0800 Subject: [PATCH 3/3] Ignore the NotFoundException --- .../org/apache/pulsar/PulsarClusterMetadataTeardown.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java index 9b9cf1baa4247..30a0dabea9812 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java @@ -162,7 +162,6 @@ public static void main(String[] args) throws Exception { .configFilePath(arguments.configurationStoreConfigPath) .metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE).build()); PulsarResources resources = new PulsarResources(metadataStore, configMetadataStore); - resources.getClusterResources().deleteCluster(arguments.cluster); // Cleanup replication cluster from all tenants and namespaces TenantResources tenantResources = resources.getTenantResources(); NamespaceResources namespaceResources = resources.getNamespaceResources(); @@ -177,6 +176,12 @@ public static void main(String[] args) throws Exception { } removeCurrentClusterFromAllowedClusters(tenantResources, tenant, arguments.cluster); } + try { + resources.getClusterResources().deleteCluster(arguments.cluster); + } catch (MetadataStoreException.NotFoundException ex) { + // Ignore if the cluster does not exist + log.info("Cluster metadata for '{}' does not exist.", arguments.cluster); + } } log.info("Cluster metadata for '{}' teardown.", arguments.cluster);