Skip to content

Commit

Permalink
[improve][broker] Support to specify auth-plugin, auth-parameters and…
Browse files Browse the repository at this point in the history
… tls-enable arguments when init cluster metadata (apache#23087)

### Motivation

When using a global configuration store and geo-replication, support to specify `auth-plugin`, `auth-parameters`, and `tls-enable` arguments when init cluster metadata will be useful, it can reduce one step to create the cluster with auth.

### Modifications

Support to specify `auth-plugin`, `auth-parameters` and `tls-enable` arguments when init cluster metadata
  • Loading branch information
Demogorgon314 authored Jul 29, 2024
1 parent 77b6378 commit 49d3beb
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,19 @@ private static class Arguments {
description = "Broker-service URL for new cluster with TLS encryption", required = false)
private String clusterBrokerServiceUrlTls;

@Option(names = {"-te",
"--tls-enable"},
description = "Enable TLS connection for new cluster")
private Boolean clusterBrokerClientTlsEnabled;

@Option(names = "--auth-plugin",
description = "The authentication plugin for new cluster")
protected String clusterAuthenticationPlugin;

@Option(names = "--auth-parameters",
description = "The authentication parameters for new cluster")
protected String clusterAuthenticationParameters;

@Option(names = {"-zk",
"--zookeeper"}, description = "Local ZooKeeper quorum connection string",
required = false,
Expand Down Expand Up @@ -317,14 +330,36 @@ private static void initializeCluster(Arguments arguments, int bundleNumberForDe

PulsarResources resources = new PulsarResources(localStore, configStore);

ClusterData clusterData = ClusterData.builder()
.serviceUrl(arguments.clusterWebServiceUrl)
.serviceUrlTls(arguments.clusterWebServiceUrlTls)
.brokerServiceUrl(arguments.clusterBrokerServiceUrl)
.brokerServiceUrlTls(arguments.clusterBrokerServiceUrlTls)
.proxyServiceUrl(arguments.clusterProxyUrl)
.proxyProtocol(arguments.clusterProxyProtocol)
.build();
ClusterData.Builder clusterDataBuilder = ClusterData.builder();
if (arguments.clusterWebServiceUrl != null) {
clusterDataBuilder.serviceUrl(arguments.clusterWebServiceUrl);
}
if (arguments.clusterWebServiceUrlTls != null) {
clusterDataBuilder.serviceUrlTls(arguments.clusterWebServiceUrlTls);
}
if (arguments.clusterBrokerServiceUrl != null) {
clusterDataBuilder.brokerServiceUrl(arguments.clusterBrokerServiceUrl);
}
if (arguments.clusterBrokerServiceUrlTls != null) {
clusterDataBuilder.brokerServiceUrlTls(arguments.clusterBrokerServiceUrlTls);
}
if (arguments.clusterBrokerClientTlsEnabled != null) {
clusterDataBuilder.brokerClientTlsEnabled(arguments.clusterBrokerClientTlsEnabled);
}
if (arguments.clusterAuthenticationPlugin != null) {
clusterDataBuilder.authenticationPlugin(arguments.clusterAuthenticationPlugin);
}
if (arguments.clusterAuthenticationParameters != null) {
clusterDataBuilder.authenticationParameters(arguments.clusterAuthenticationParameters);
}
if (arguments.clusterProxyUrl != null) {
clusterDataBuilder.proxyServiceUrl(arguments.clusterProxyUrl);
}
if (arguments.clusterProxyProtocol != null) {
clusterDataBuilder.proxyProtocol(arguments.clusterProxyProtocol);
}

ClusterData clusterData = clusterDataBuilder.build();
if (!resources.getClusterResources().clusterExists(arguments.cluster)) {
resources.getClusterResources().createCluster(arguments.cluster, clusterData);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.pulsar.PulsarInitialNamespaceSetup;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.resources.TenantResources;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.worker.WorkerUtils;
Expand Down Expand Up @@ -86,6 +87,48 @@ public void testReSetupClusterMetadata() throws Exception {
PulsarClusterMetadataSetup.main(args);
SortedMap<String, String> data3 = localZkS.dumpData();
assertEquals(data1, data3);
String clusterDataJson = data1.get("/admin/clusters/testReSetupClusterMetadata-cluster");
assertNotNull(clusterDataJson);
ClusterData clusterData = ObjectMapperFactory
.getMapper()
.reader()
.readValue(clusterDataJson, ClusterData.class);
assertEquals(clusterData.getServiceUrl(), "http://127.0.0.1:8080");
assertEquals(clusterData.getServiceUrlTls(), "https://127.0.0.1:8443");
assertEquals(clusterData.getBrokerServiceUrl(), "pulsar://127.0.0.1:6650");
assertEquals(clusterData.getBrokerServiceUrlTls(), "pulsar+ssl://127.0.0.1:6651");
assertFalse(clusterData.isBrokerClientTlsEnabled());
}

public void testSetupClusterMetadataWithAuthEnabled() throws Exception {
String clusterName = "cluster-with-auth";
String[] args = {
"--cluster", clusterName,
"--zookeeper", "127.0.0.1:" + localZkS.getZookeeperPort(),
"--configuration-store", "127.0.0.1:" + localZkS.getZookeeperPort(),
"--web-service-url", "http://127.0.0.1:8080",
"--web-service-url-tls", "https://127.0.0.1:8443",
"--broker-service-url", "pulsar://127.0.0.1:6650",
"--broker-service-url-tls", "pulsar+ssl://127.0.0.1:6651",
"--tls-enable",
"--auth-plugin", "org.apache.pulsar.client.impl.auth.AuthenticationToken",
"--auth-parameters", "token:my-token"
};
PulsarClusterMetadataSetup.main(args);
SortedMap<String, String> data = localZkS.dumpData();
String clusterDataJson = data.get("/admin/clusters/" + clusterName);
assertNotNull(clusterDataJson);
ClusterData clusterData = ObjectMapperFactory
.getMapper()
.reader()
.readValue(clusterDataJson, ClusterData.class);
assertEquals(clusterData.getServiceUrl(), "http://127.0.0.1:8080");
assertEquals(clusterData.getServiceUrlTls(), "https://127.0.0.1:8443");
assertEquals(clusterData.getBrokerServiceUrl(), "pulsar://127.0.0.1:6650");
assertEquals(clusterData.getBrokerServiceUrlTls(), "pulsar+ssl://127.0.0.1:6651");
assertTrue(clusterData.isBrokerClientTlsEnabled());
assertEquals(clusterData.getAuthenticationPlugin(), "org.apache.pulsar.client.impl.auth.AuthenticationToken");
assertEquals(clusterData.getAuthenticationParameters(), "token:my-token");
}

@DataProvider(name = "bundleNumberForDefaultNamespace")
Expand Down

0 comments on commit 49d3beb

Please sign in to comment.