diff --git a/.gitignore b/.gitignore index 480322603cbb8..f3fd3531cf321 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,8 @@ test-results dependency-reduced-pom.xml logs /data +pulsar-broker/tmp.* +pulsar-broker/src/test/resources/log4j.properties *.versionsBackup diff --git a/conf/broker.conf b/conf/broker.conf index 6d23675a8e1d8..d028b4cc452dc 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -37,8 +37,11 @@ webServicePortTls=8443 # Enable the WebSocket API service in broker webSocketServiceEnabled=false -# Control whether to bind directly on localhost rather than on normal hostname -bindOnLocalhost=false +# Hostname or IP address the service binds on, default is 0.0.0.0. +bindAddress=0.0.0.0 + +# Hostname or IP address the service advertises to the outside world. If not set, the value of InetAddress.getLocalHost().getHostName() is used. +advertisedAddress= # Name of the cluster to which this broker belongs to clusterName= diff --git a/conf/standalone.conf b/conf/standalone.conf index 11a9c16a3fc6f..379cb0ac1d33b 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -27,8 +27,11 @@ brokerServicePort=6650 # Port to use to server HTTP request webServicePort=8080 -# Control whether to bind directly on localhost rather than on normal hostname -bindOnLocalhost=true +# Hostname or IP address the service binds on, default is 0.0.0.0. +bindAddress=0.0.0.0 + +# Hostname or IP address the service advertises to the outside world. If not set, the value of InetAddress.getLocalHost().getHostName() is used. +advertisedAddress= # Name of the cluster to which this broker belongs to clusterName=standalone diff --git a/conf/websocket.conf b/conf/websocket.conf index d0eee0351c7b2..1dc70cd719fe5 100644 --- a/conf/websocket.conf +++ b/conf/websocket.conf @@ -22,8 +22,8 @@ globalZookeeperServers= # Port to use to server HTTP request webServicePort=8080 -# Control whether to bind directly on localhost rather than on normal hostname -bindOnLocalhost=false +# Hostname or IP address the service binds on, default is 0.0.0.0. +bindAddress=0.0.0.0 # Name of the pulsar cluster to connect to clusterName= diff --git a/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/ServiceConfiguration.java index d420e7e9e337e..ec67449827ffe 100644 --- a/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/ServiceConfiguration.java @@ -43,9 +43,12 @@ public class ServiceConfiguration { private int webServicePort = 8080; // Port to use to server HTTPS request private int webServicePortTls = 8443; - // Control whether to bind directly on localhost rather than on normal - // hostname - private boolean bindOnLocalhost = false; + + // Hostname or IP address the service binds on. + private String bindAddress = "0.0.0.0"; + + // Controls which hostname is advertised to the discovery service via ZooKeeper. + private String advertisedAddress; // Enable the WebSocket API service private boolean webSocketServiceEnabled = false; @@ -290,12 +293,20 @@ public void setWebServicePortTls(int webServicePortTls) { this.webServicePortTls = webServicePortTls; } - public boolean isBindOnLocalhost() { - return bindOnLocalhost; + public String getBindAddress() { + return this.bindAddress; + } + + public void setBindAddress(String bindAddress) { + this.bindAddress = bindAddress; + } + + public String getAdvertisedAddress() { + return this.advertisedAddress; } - public void setBindOnLocalhost(boolean bindOnLocalhost) { - this.bindOnLocalhost = bindOnLocalhost; + public void setAdvertisedAddress(String advertisedAddress) { + this.advertisedAddress = advertisedAddress; } public boolean isWebSocketServiceEnabled() { diff --git a/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/ServiceConfigurationUtils.java b/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/ServiceConfigurationUtils.java new file mode 100644 index 0000000000000..a7c0141eafe51 --- /dev/null +++ b/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/ServiceConfigurationUtils.java @@ -0,0 +1,44 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yahoo.pulsar.broker; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; + +public class ServiceConfigurationUtils { + + private static final Logger LOG = LoggerFactory.getLogger(ServiceConfigurationUtils.class); + + public static String getDefaultOrConfiguredAddress(String configuredAddress) { + if ( configuredAddress == null ) { + return unsafeLocalhostResolve(); + } + return configuredAddress; + } + + public static String unsafeLocalhostResolve() { + try { + return InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException ex) { + LOG.error(ex.getMessage(), ex); + throw new IllegalStateException("Failed to resolve localhost name.", ex); + } + } + +} diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/PulsarService.java index 040faae248267..045fdc82e2cfd 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/PulsarService.java @@ -18,6 +18,7 @@ import java.io.IOException; import java.net.InetAddress; import java.net.URL; +import java.net.UnknownHostException; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; @@ -97,7 +98,8 @@ public class PulsarService implements AutoCloseable { private LoadManager loadManager = null; private PulsarAdmin adminClient = null; private ZooKeeperClientFactory zkClientFactory = null; - private final String host; + private final String bindAddress; + private final String advertisedAddress; private final String webServiceAddress; private final String webServiceAddressTls; private final String brokerServiceUrl; @@ -118,7 +120,8 @@ public enum State { public PulsarService(ServiceConfiguration config) { state = State.Init; - this.host = host(config); + this.bindAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getBindAddress()); + this.advertisedAddress = advertisedAddress(config); this.webServiceAddress = webAddress(config); this.webServiceAddressTls = webAddressTls(config); this.brokerServiceUrl = brokerUrl(config); @@ -319,7 +322,7 @@ private void acquireSLANamespace() { try { // Namespace not created hence no need to unload it if (!this.globalZkCache.exists( - AdminResource.path("policies") + "/" + NamespaceService.getSLAMonitorNamespace(host, config))) { + AdminResource.path("policies") + "/" + NamespaceService.getSLAMonitorNamespace(getAdvertisedAddress(), config))) { return; } if (!this.nsservice.registerSLANamespace()) { @@ -540,8 +543,7 @@ public BookKeeperClientFactory getBookKeeperClientFactory() { public synchronized PulsarAdmin getAdminClient() throws PulsarServerException { if (this.adminClient == null) { try { - String adminApiUrl = "http://" + InetAddress.getLocalHost().getHostName() + ":" - + this.getConfiguration().getWebServicePort(); + String adminApiUrl = webAddress(config); this.adminClient = new PulsarAdmin(new URL(adminApiUrl), this.getConfiguration().getBrokerClientAuthenticationPlugin(), this.getConfiguration().getBrokerClientAuthenticationParameters()); @@ -563,50 +565,44 @@ public MessagingServiceShutdownHook getShutdownService() { } /** - * Derive the host + * Advertised service address. * - * @param isBindOnLocalhost - * @return + * @return Hostname or IP address the service advertises to the outside world. */ - public static String host(ServiceConfiguration config) { - try { - if (!config.isBindOnLocalhost()) { - return InetAddress.getLocalHost().getHostName(); - } else { - return "localhost"; - } - } catch (Exception e) { - LOG.error(e.getMessage(), e); - throw new IllegalStateException("failed to find host", e); - } + public static String advertisedAddress(ServiceConfiguration config) { + return ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress()); } public static String brokerUrl(ServiceConfiguration config) { - return "pulsar://" + host(config) + ":" + config.getBrokerServicePort(); + return "pulsar://" + advertisedAddress(config) + ":" + config.getBrokerServicePort(); } public static String brokerUrlTls(ServiceConfiguration config) { if (config.isTlsEnabled()) { - return "pulsar://" + host(config) + ":" + config.getBrokerServicePortTls(); + return "pulsar://" + advertisedAddress(config) + ":" + config.getBrokerServicePortTls(); } else { return ""; } } public static String webAddress(ServiceConfiguration config) { - return String.format("http://%s:%d", host(config), config.getWebServicePort()); + return String.format("http://%s:%d", advertisedAddress(config), config.getWebServicePort()); } public static String webAddressTls(ServiceConfiguration config) { if (config.isTlsEnabled()) { - return String.format("https://%s:%d", host(config), config.getWebServicePortTls()); + return String.format("https://%s:%d", advertisedAddress(config), config.getWebServicePortTls()); } else { return ""; } } - public String getHost() { - return host; + public String getBindAddress() { + return bindAddress; + } + + public String getAdvertisedAddress() { + return advertisedAddress; } public String getWebServiceAddress() { diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/PersistentTopics.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/PersistentTopics.java index 0e8deb30e3b31..bb9e04c5ab7dc 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/PersistentTopics.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/PersistentTopics.java @@ -903,7 +903,7 @@ public PersistentOfflineTopicStats getBacklog(@PathParam("property") String prop } final ManagedLedgerConfig config = pulsar().getBrokerService().getManagedLedgerConfig(dn).get(); ManagedLedgerOfflineBacklog offlineTopicBacklog = new ManagedLedgerOfflineBacklog(config.getDigestType(), - config.getPassword(), pulsar().getHost(), false); + config.getPassword(), pulsar().getAdvertisedAddress(), false); offlineTopicStats = offlineTopicBacklog .estimateUnloadedTopicBacklog((ManagedLedgerFactoryImpl) pulsar().getManagedLedgerFactory(), dn); pulsar().getBrokerService().cacheOfflineTopicStats(dn, offlineTopicStats); diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java index 50842ded5d516..a9a1786d84a9a 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java @@ -241,7 +241,7 @@ public void start() throws PulsarServerException { } } - String lookupServiceAddress = pulsar.getHost() + ":" + conf.getWebServicePort(); + String lookupServiceAddress = pulsar.getAdvertisedAddress() + ":" + conf.getWebServicePort(); brokerZnodePath = LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress; LoadReport loadReport = null; try { @@ -637,7 +637,7 @@ public void writeResourceQuotasToZooKeeper() throws Exception { */ private synchronized void doLoadRanking() { ResourceUnitRanking.setCpuUsageByMsgRate(this.realtimeCpuLoadFactor); - String hostname = pulsar.getHost(); + String hostname = pulsar.getAdvertisedAddress(); String strategy = this.getLoadBalancerPlacementStrategy(); log.info("doLoadRanking - load balancing strategy: {}", strategy); if (!currentLoadReports.isEmpty()) { @@ -1094,7 +1094,7 @@ public LoadReport generateLoadReport() throws Exception { try { LoadReport loadReport = new LoadReport(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(), pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls()); - loadReport.setName(String.format("%s:%s", pulsar.getHost(), pulsar.getConfiguration().getWebServicePort())); + loadReport.setName(String.format("%s:%s", pulsar.getAdvertisedAddress(), pulsar.getConfiguration().getWebServicePort())); SystemResourceUsage systemResourceUsage = this.getSystemResourceUsage(); loadReport.setOverLoaded( isAboveLoadLevel(systemResourceUsage, this.getLoadBalancerBrokerOverloadedThresholdPercentage())); diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/NamespaceService.java index ef6b5ec143cec..bfd946552e0d8 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/NamespaceService.java @@ -118,7 +118,7 @@ public enum AddressType { */ public NamespaceService(PulsarService pulsar) { this.pulsar = pulsar; - host = pulsar.getHost(); + host = pulsar.getAdvertisedAddress(); this.config = pulsar.getConfiguration(); this.loadManager = pulsar.getLoadManager(); ServiceUnitZkUtils.initZK(pulsar.getLocalZkCache().getZooKeeper(), pulsar.getBrokerServiceUrl()); @@ -446,9 +446,9 @@ private NamespaceOwnershipStatus getNamespaceOwnershipStatus(OwnedServiceUnit ns } // found corresponding policy, set the status to controlled nsOwnedStatus.is_controlled = true; - if (nsIsolationPolicy.isPrimaryBroker(pulsar.getHost())) { + if (nsIsolationPolicy.isPrimaryBroker(pulsar.getAdvertisedAddress())) { nsOwnedStatus.broker_assignment = BrokerAssignment.primary; - } else if (nsIsolationPolicy.isSecondaryBroker(pulsar.getHost())) { + } else if (nsIsolationPolicy.isSecondaryBroker(pulsar.getAdvertisedAddress())) { nsOwnedStatus.broker_assignment = BrokerAssignment.secondary; } diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java index 263337ff47ea4..ecc460ded45dd 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java @@ -22,21 +22,19 @@ import java.io.Closeable; import java.io.IOException; +import java.net.InetSocketAddress; import java.net.URI; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import org.apache.bookkeeper.client.BookKeeper.DigestType; @@ -72,7 +70,6 @@ import com.yahoo.pulsar.common.naming.DestinationName; import com.yahoo.pulsar.common.naming.NamespaceBundle; import com.yahoo.pulsar.common.naming.NamespaceBundleFactory; -import com.yahoo.pulsar.common.naming.NamespaceBundles; import com.yahoo.pulsar.common.naming.NamespaceName; import com.yahoo.pulsar.common.naming.ServiceUnitId; import com.yahoo.pulsar.common.policies.data.ClusterData; @@ -220,13 +217,13 @@ public void start() throws Exception { bootstrap.childHandler(new PulsarChannelInitializer(this, serviceConfig, false)); // Bind and start to accept incoming connections. - bootstrap.bind(port).sync(); + bootstrap.bind(new InetSocketAddress(pulsar.getBindAddress(), port)).sync(); log.info("Started Pulsar Broker service on port {}", port); if (serviceConfig.isTlsEnabled()) { ServerBootstrap tlsBootstrap = bootstrap.clone(); tlsBootstrap.childHandler(new PulsarChannelInitializer(this, serviceConfig, true)); - tlsBootstrap.bind(tlsPort).sync(); + tlsBootstrap.bind(new InetSocketAddress(pulsar.getBindAddress(), tlsPort)).sync(); log.info("Started Pulsar Broker TLS service on port {}", tlsPort); } diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/PulsarStats.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/PulsarStats.java index fdaad39c55be7..4caa13b2c23ea 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/PulsarStats.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/PulsarStats.java @@ -66,7 +66,7 @@ public PulsarStats(PulsarService pulsar) { this.tempMetricsCollection = Lists.newArrayList(); this.metricsCollection = Lists.newArrayList(); this.brokerOperabilityMetrics = new BrokerOperabilityMetrics(pulsar.getConfiguration().getClusterName(), - pulsar.getHost()); + pulsar.getAdvertisedAddress()); } @Override diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/web/WebService.java index 1877a7d5851a5..93a9e5d558db0 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/web/WebService.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/web/WebService.java @@ -85,6 +85,7 @@ public WebService(ServiceConfiguration config, PulsarService pulsar) throws Puls ServerConnector connector = new PulsarServerConnector(server, 1, 1); connector.setPort(config.getWebServicePort()); + connector.setHost(pulsar.getBindAddress()); connectors.add(connector); if (config.isTlsEnabled()) { @@ -102,6 +103,7 @@ public WebService(ServiceConfiguration config, PulsarService pulsar) throws Puls sslCtxFactory.setWantClientAuth(true); ServerConnector tlsConnector = new PulsarServerConnector(server, 1, 1, sslCtxFactory); tlsConnector.setPort(config.getWebServicePortTls()); + tlsConnector.setHost(pulsar.getBindAddress()); connectors.add(tlsConnector); } diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/SLAMonitoringTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/SLAMonitoringTest.java index 34257e9dfe56a..02eb42826b6a6 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/SLAMonitoringTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/SLAMonitoringTest.java @@ -67,7 +67,6 @@ public class SLAMonitoringTest { private int[] brokerWebServicePorts = new int[BROKER_COUNT]; private int[] brokerNativeBrokerPorts = new int[BROKER_COUNT]; private URL[] brokerUrls = new URL[BROKER_COUNT]; - private String[] lookupAddresses = new String[BROKER_COUNT]; private PulsarService[] pulsarServices = new PulsarService[BROKER_COUNT]; private PulsarAdmin[] pulsarAdmins = new PulsarAdmin[BROKER_COUNT]; private ServiceConfiguration[] configurations = new ServiceConfiguration[BROKER_COUNT]; @@ -96,7 +95,6 @@ void setup() throws Exception { pulsarServices[i].start(); brokerUrls[i] = new URL("http://127.0.0.1" + ":" + brokerWebServicePorts[i]); - lookupAddresses[i] = pulsarServices[i].getHost() + ":" + config.getWebServicePort(); pulsarAdmins[i] = new PulsarAdmin(brokerUrls[i], (Authentication) null); } @@ -105,7 +103,7 @@ void setup() throws Exception { createProperty(pulsarAdmins[BROKER_COUNT - 1]); for (int i = 0; i < BROKER_COUNT; i++) { String destination = String.format("%s/%s/%s:%s", NamespaceService.SLA_NAMESPACE_PROPERTY, "my-cluster", - pulsarServices[i].getHost(), brokerWebServicePorts[i]); + pulsarServices[i].getAdvertisedAddress(), brokerWebServicePorts[i]); pulsarAdmins[0].namespaces().createNamespace(destination); } } @@ -175,13 +173,13 @@ public void testOwnershipViaAdminAfterSetup() { for (int i = 0; i < BROKER_COUNT; i++) { try { String destination = String.format("persistent://%s/%s/%s:%s/%s", - NamespaceService.SLA_NAMESPACE_PROPERTY, "my-cluster", pulsarServices[i].getHost(), + NamespaceService.SLA_NAMESPACE_PROPERTY, "my-cluster", pulsarServices[i].getAdvertisedAddress(), brokerWebServicePorts[i], "my-topic"); assertEquals(pulsarAdmins[0].lookups().lookupDestination(destination), - "pulsar://" + pulsarServices[i].getHost() + ":" + brokerNativeBrokerPorts[i]); + "pulsar://" + pulsarServices[i].getAdvertisedAddress() + ":" + brokerNativeBrokerPorts[i]); } catch (Exception e) { e.printStackTrace(); - fail("SLA Namespace should have been owned by the broker(" + "pulsar://" + pulsarServices[i].getHost() + fail("SLA Namespace should have been owned by the broker(" + "pulsar://" + pulsarServices[i].getAdvertisedAddress() + ":" + brokerNativeBrokerPorts[i] + ")"); } } @@ -200,7 +198,7 @@ public void testUnloadIfBrokerCrashes() { } String destination = String.format("persistent://%s/%s/%s:%s/%s", NamespaceService.SLA_NAMESPACE_PROPERTY, - "my-cluster", pulsarServices[crashIndex].getHost(), brokerWebServicePorts[crashIndex], "my-topic"); + "my-cluster", pulsarServices[crashIndex].getAdvertisedAddress(), brokerWebServicePorts[crashIndex], "my-topic"); log.info("Lookup for namespace {}", destination); @@ -209,7 +207,7 @@ public void testUnloadIfBrokerCrashes() { broker = pulsarAdmins[BROKER_COUNT - 1].lookups().lookupDestination(destination); log.info("{} Namespace is owned by {}", destination, broker); assertNotEquals(broker, - "pulsar://" + pulsarServices[crashIndex].getHost() + ":" + brokerNativeBrokerPorts[crashIndex]); + "pulsar://" + pulsarServices[crashIndex].getAdvertisedAddress() + ":" + brokerNativeBrokerPorts[crashIndex]); } catch (PulsarAdminException e) { e.printStackTrace(); fail("The SLA Monitor namespace should be owned by some other broker"); @@ -230,7 +228,7 @@ public void testUnloadIfBrokerCrashes() { broker = pulsarAdmins[0].lookups().lookupDestination(destination); log.info("{} Namespace is re-owned by {}", destination, broker); assertEquals(broker, - "pulsar://" + pulsarServices[crashIndex].getHost() + ":" + brokerNativeBrokerPorts[crashIndex]); + "pulsar://" + pulsarServices[crashIndex].getAdvertisedAddress() + ":" + brokerNativeBrokerPorts[crashIndex]); } catch (PulsarAdminException e) { e.printStackTrace(); fail("The SLA Monitor namespace should be reowned by the broker" + broker); diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/AdminApiTest.java index 5deafe4fe7b60..0d3813f6ccf9d 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/AdminApiTest.java @@ -120,7 +120,6 @@ public void setup() throws Exception { otherconfig.setBrokerServicePort(SECONDARY_BROKER_PORT); otherconfig.setWebServicePort(SECONDARY_BROKER_WEBSERVICE_PORT); otherconfig.setLoadBalancerEnabled(false); - otherconfig.setBindOnLocalhost(true); otherconfig.setClusterName("test"); otherPulsar = startBroker(otherconfig); @@ -353,7 +352,7 @@ public void brokers() throws Exception { Assert.assertEquals(1, nsMap.size()); for (String ns : nsMap.keySet()) { NamespaceOwnershipStatus nsStatus = nsMap.get(ns); - if (ns.equals(NamespaceService.getHeartbeatNamespace(pulsar.getHost(), pulsar.getConfiguration()) + if (ns.equals(NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), pulsar.getConfiguration()) + "/0x00000000_0xffffffff")) { assertEquals(nsStatus.broker_assignment, BrokerAssignment.shared); assertFalse(nsStatus.is_controlled); diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/AdminTest.java index b376db79b92a5..f0e3fb2544e57 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/AdminTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/AdminTest.java @@ -500,7 +500,7 @@ void brokers() throws Exception { Set activeBrokers = brokers.getActiveBrokers("use"); assertEquals(activeBrokers.size(), 1); - assertEquals(activeBrokers, Sets.newHashSet("localhost:" + BROKER_WEBSERVICE_PORT)); + assertEquals(activeBrokers, Sets.newHashSet(pulsar.getAdvertisedAddress() + ":" + BROKER_WEBSERVICE_PORT)); } @Test diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index 7795b74c41553..af7bdc01a6a40 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -76,8 +76,8 @@ public MockedPulsarServiceBaseTest() { this.conf.setBrokerServicePortTls(BROKER_PORT_TLS); this.conf.setWebServicePort(BROKER_WEBSERVICE_PORT); this.conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); - this.conf.setBindOnLocalhost(true); this.conf.setClusterName("test"); + this.conf.setAdvertisedAddress("localhost"); // there are TLS tests in here, they need to use localhost because of the certificate } protected final void internalSetup() throws Exception { @@ -102,8 +102,8 @@ private final void init() throws Exception { startBroker(); - brokerUrl = new URL("http://localhost:" + BROKER_WEBSERVICE_PORT); - brokerUrlTls = new URL("https://localhost:" + BROKER_WEBSERVICE_PORT_TLS); + brokerUrl = new URL("http://" + pulsar.getAdvertisedAddress() + ":" + BROKER_WEBSERVICE_PORT); + brokerUrlTls = new URL("https://" + pulsar.getAdvertisedAddress() + ":" + BROKER_WEBSERVICE_PORT_TLS); admin = spy(new PulsarAdmin(brokerUrl, (Authentication) null)); } diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/loadbalance/LoadBalancerTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/loadbalance/LoadBalancerTest.java index 2ab40ecf181b3..c0e173f070ae4 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/loadbalance/LoadBalancerTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/loadbalance/LoadBalancerTest.java @@ -140,7 +140,7 @@ void setup() throws Exception { pulsarServices[i].start(); brokerUrls[i] = new URL("http://127.0.0.1" + ":" + brokerWebServicePorts[i]); - lookupAddresses[i] = pulsarServices[i].getHost() + ":" + config.getWebServicePort(); + lookupAddresses[i] = pulsarServices[i].getAdvertisedAddress() + ":" + config.getWebServicePort(); pulsarAdmins[i] = new PulsarAdmin(brokerUrls[i], (Authentication) null); } diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java index ca8dc358f7f22..f0d7144e97f3e 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java @@ -174,7 +174,7 @@ private void createNamespacePolicies(PulsarService pulsar) throws Exception { policyData.namespaces = new ArrayList(); policyData.namespaces.add("pulsar/use/primary-ns.*"); policyData.primary = new ArrayList(); - policyData.primary.add(pulsar1.getHost() + "*"); + policyData.primary.add(pulsar1.getAdvertisedAddress() + "*"); policyData.secondary = new ArrayList(); policyData.secondary.add("prod2-broker([78]).messaging.usw.example.co.*"); policyData.auto_failover_policy = new AutoFailoverPolicyData(); @@ -237,7 +237,7 @@ public void testPrimary() throws Exception { rd.put("bandwidthOut", new ResourceUsage(550 * 1024, 1024 * 1024)); ResourceUnit ru1 = new SimpleResourceUnit( - "http://" + pulsar1.getHost() + ":" + pulsar1.getConfiguration().getWebServicePort(), rd); + "http://" + pulsar1.getAdvertisedAddress() + ":" + pulsar1.getConfiguration().getWebServicePort(), rd); Set rus = new HashSet(); rus.add(ru1); LoadRanker lr = new ResourceAvailabilityRanker(); diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/OwnershipCacheTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/OwnershipCacheTest.java index 85e119547253b..f1c8cdc70eafa 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/OwnershipCacheTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/OwnershipCacheTest.java @@ -16,7 +16,6 @@ package com.yahoo.pulsar.broker.namespace; import static com.google.common.base.Preconditions.checkNotNull; -import static com.yahoo.pulsar.broker.PulsarService.host; import static com.yahoo.pulsar.broker.PulsarService.webAddress; import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.doReturn; @@ -80,7 +79,6 @@ public void setup() throws Exception { doReturn(nsService).when(pulsar).getNamespaceService(); doReturn(port).when(config).getBrokerServicePort(); doReturn(brokerService).when(pulsar).getBrokerService(); - doReturn(host(config)).when(pulsar).getHost(); doReturn(webAddress(config)).when(pulsar).getWebServiceAddress(); doReturn(selfBrokerUrl).when(pulsar).getBrokerServiceUrl(); } diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/AdvertisedAddressTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/AdvertisedAddressTest.java new file mode 100644 index 0000000000000..014304157fd6b --- /dev/null +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/AdvertisedAddressTest.java @@ -0,0 +1,75 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yahoo.pulsar.broker.service; + +import com.yahoo.pulsar.broker.PulsarService; +import com.yahoo.pulsar.broker.ServiceConfiguration; +import com.yahoo.pulsar.zookeeper.LocalBookkeeperEnsemble; +import org.apache.zookeeper.data.Stat; +import org.json.JSONObject; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.nio.charset.StandardCharsets; + +public class AdvertisedAddressTest { + + LocalBookkeeperEnsemble bkEnsemble; + PulsarService pulsar; + + private final int ZOOKEEPER_PORT = 12759; + private final int BROKER_WEBSERVICE_PORT = 15782; + private final int BROKER_SERVICE_PORT = 16650; + + private final String advertisedAddress = "pulsar-usc.example.com"; + + @Before + public void setup() throws Exception { + bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, 5001); + bkEnsemble.start(); + ServiceConfiguration config = new ServiceConfiguration(); + config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT); + config.setWebServicePort(BROKER_WEBSERVICE_PORT); + config.setClusterName("usc"); + config.setBrokerServicePort(BROKER_SERVICE_PORT); + config.setAdvertisedAddress(advertisedAddress); + config.setManagedLedgerMaxEntriesPerLedger(5); + config.setManagedLedgerMinLedgerRolloverTimeMinutes(0); + pulsar = new PulsarService(config); + pulsar.start(); + } + + @After + public void shutdown() throws Exception { + pulsar.close(); + bkEnsemble.stop(); + } + + @Test + public void testAdvertisedAddress() throws Exception { + Assert.assertEquals( pulsar.getAdvertisedAddress(), advertisedAddress ); + Assert.assertEquals( pulsar.getBrokerServiceUrl(), String.format("pulsar://%s:%d", advertisedAddress, BROKER_SERVICE_PORT) ); + Assert.assertEquals( pulsar.getWebServiceAddress(), String.format("http://%s:%d", advertisedAddress, BROKER_WEBSERVICE_PORT) ); + String brokerZkPath = String.format("/loadbalance/brokers/%s:%d", pulsar.getAdvertisedAddress(), BROKER_WEBSERVICE_PORT); + String bkBrokerData = new String(bkEnsemble.getZkClient().getData(brokerZkPath, false, new Stat()), StandardCharsets.UTF_8); + JSONObject jsonBkBrokerData = new JSONObject(bkBrokerData); + Assert.assertEquals( jsonBkBrokerData.get("pulsarServiceUrl"), pulsar.getBrokerServiceUrl() ); + Assert.assertEquals( jsonBkBrokerData.get("webServiceUrl"), pulsar.getWebServiceAddress() ); + } + +} diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BacklogQuotaManagerTest.java index 03e266a959e3b..bd7e7199d3c09 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BacklogQuotaManagerTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BacklogQuotaManagerTest.java @@ -84,7 +84,6 @@ void setup() throws Exception { config.setBrokerServicePort(BROKER_SERVICE_PORT); config.setAuthorizationEnabled(false); config.setAuthenticationEnabled(false); - config.setBindOnLocalhost(true); config.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA); config.setManagedLedgerMaxEntriesPerLedger(5); config.setManagedLedgerMinLedgerRolloverTimeMinutes(0); diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BrokerServiceTest.java index 3260239485879..4288b7e2a400d 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BrokerServiceTest.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.json.JSONArray; +import org.json.JSONException; import org.json.JSONObject; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -213,11 +214,23 @@ public void testBrokerStatsMetrics() throws Exception { JSONArray metrics = brokerStatsClient.getMetrics(); assertEquals(metrics.length(), 4, metrics.toString()); - JSONObject obj = metrics.getJSONObject(2); - assertTrue(obj.getString("dimensions").contains("prop/use/ns-abc")); + // these metrics seem to be arriving in different order at different times... + // is the order really relevant here? + boolean namespaceDimensionFound = false; + boolean topicLoadTimesDimensionFound = false; + for ( int i=0; i