Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removes bindOnLocalhost=boolean. Adds bindAddress and advertisedAddress. #26

Merged
merged 20 commits into from
Sep 26, 2016
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
c7e3a7a
Changes bindOnLocalhost=boolean to bindAddress. Introduces advertised…
radekg Sep 20, 2016
cc08ed3
Removes advertisement from web socket config.
radekg Sep 20, 2016
6fa126f
ENsure services bind on the bind address.
radekg Sep 20, 2016
3745033
Change bind address and advertised address hostname resolution to as …
radekg Sep 20, 2016
393e10e
Remove unnecessary LOG in ServiceConfiguration.
radekg Sep 20, 2016
3ed746e
Removed unused imports.
radekg Sep 20, 2016
1bec010
Corrections in comments.
radekg Sep 20, 2016
bf27260
ServiceConfigurration in tests not loaded from Properties.
radekg Sep 20, 2016
368bb8d
Merge branch 'master' into advertisedAddress
radekg Sep 21, 2016
10ccdb6
Fixes the hanging tests.
radekg Sep 23, 2016
2a0920a
Merge branch 'master' into advertisedAddress
radekg Sep 23, 2016
98f01cd
Fixes to make the tests pass.
radekg Sep 23, 2016
94dde8e
Removing the setBindAddress and setAdvertisedAddress from tests. Unit…
radekg Sep 23, 2016
2b9c25c
Fixed TLS tests. These need to advertise as localhost.
radekg Sep 24, 2016
18d3c91
Missing license header.
radekg Sep 24, 2016
2683759
Fixes BrokerServiceTest.testBrokerStatsMetrics test.
radekg Sep 24, 2016
1db4d34
Updates comments for advertisedAddress in the configuration files.
radekg Sep 24, 2016
e51b961
Unit tests passing.
radekg Sep 26, 2016
1ac44bf
Last place where the getHost() needs to be replaced with getAdvertise…
radekg Sep 26, 2016
d4c76f5
Address the final review point, adjust the metrics test, rename Pulsa…
radekg Sep 26, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@ webServicePortTls=8443
# Enable the WebSocket API service in broker
webSocketServiceEnabled=false

# Control whether to bind directly on localhost rather than on normal hostname
bindOnLocalhost=false
# Hostname or IP address the service binds on, default is InetAddress.getLocalHost().getHostName().
bindAddress=
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving empty for clarity.

Copy link
Contributor

@merlimat merlimat Sep 20, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be 0.0.0.0 instead of hostname by default?

Copy link
Contributor Author

@radekg radekg Sep 20, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that's a personal preference. Some people like things wide open by default, some do not. I don't mind making it 0.0.0.0.


# Hostname or IP address the service advertises to the outside world. If not set, the value of bindAddress is used.
advertisedAddress=
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving empty for clarity.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the bind address was 0.0.0.0, then defaulting to bind address might not work. Should we default to hostname for advertisedAddress ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 3745033

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update the comment with "default to hostname"


# Name of the cluster to which this broker belongs to
clusterName=
Expand Down
7 changes: 5 additions & 2 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ brokerServicePort=6650
# Port to use to server HTTP request
webServicePort=8080

# Control whether to bind directly on localhost rather than on normal hostname
bindOnLocalhost=true
# Hostname or IP address the service binds on, default is InetAddress.getLocalHost().getHostName().
bindAddress=

# Hostname or IP address the service advertises to the outside world. If not set, the value of bindAddress is used.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default to hostname

advertisedAddress=

# Name of the cluster to which this broker belongs to
clusterName=standalone
Expand Down
4 changes: 2 additions & 2 deletions conf/websocket.conf
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ globalZookeeperServers=
# Port to use to server HTTP request
webServicePort=8080

# Control whether to bind directly on localhost rather than on normal hostname
bindOnLocalhost=false
# Hostname or IP address the service binds on, default is InetAddress.getLocalHost().getHostName().
bindAddress=

# Name of the pulsar cluster to connect to
clusterName=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,13 @@ public class ServiceConfiguration {
private int webServicePort = 8080;
// Port to use to server HTTPS request
private int webServicePortTls = 8443;
// Control whether to bind directly on localhost rather than on normal
// hostname
private boolean bindOnLocalhost = false;

// Hostname or IP address the service binds on.
// If not set, InetAddress.getLocalHost().getHostName() will be used.
private String bindAddress;
// Controls which hostname is advertised to the discovery service via ZooKeeper.
// If not set, bindAddress is used.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the comment is out of date now, since it defaults to hostname

private String advertisedAddress;

// Enable the WebSocket API service
private boolean webSocketServiceEnabled = false;
Expand Down Expand Up @@ -291,12 +295,20 @@ public void setWebServicePortTls(int webServicePortTls) {
this.webServicePortTls = webServicePortTls;
}

public boolean isBindOnLocalhost() {
return bindOnLocalhost;
public String getBindAddress() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Preferably, the ServiceConfiguration should be a simple POJO. This initialization of the bind address is potentially at risk if multiple threads are initializing. It'd be better to return null (or even better Optional<String>) and have the caller cope with the missing value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to avoid throws on this method as the caller would have to require try / catch everywhere where this is used. Unless we can settle on RuntimeException or IllegalArgumentException?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, scratch that, that's a silly idea. I'll just re-throw the exception and handle where used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 3745033

Copy link
Contributor Author

@radekg radekg Sep 21, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@merlimat Why would multiple threads try to initialize a single instance of ServiceConfiguration? Is the service configuration stored somewhere as static?

return this.bindAddress;
}

public void setBindAddress(String bindAddress) {
this.bindAddress = bindAddress;
}

public String getAdvertisedAddress() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, we should just return what was configured, and in any case, the fallback here should be on hostname rather than the bind address.

return this.advertisedAddress;
}

public void setBindOnLocalhost(boolean bindOnLocalhost) {
this.bindOnLocalhost = bindOnLocalhost;
public void setAdvertisedAddress(String advertisedAddress) {
this.advertisedAddress = advertisedAddress;
}

public boolean isWebSocketServiceEnabled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.io.IOException;
import java.net.InetAddress;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -98,6 +99,7 @@ public class PulsarService implements AutoCloseable {
private PulsarAdmin adminClient = null;
private ZooKeeperClientFactory zkClientFactory = null;
private final String host;
private final String advertisedAddress;
private final String webServiceAddress;
private final String webServiceAddressTls;
private final String brokerServiceUrl;
Expand All @@ -119,6 +121,7 @@ public enum State {
public PulsarService(ServiceConfiguration config) {
state = State.Init;
this.host = host(config);
this.advertisedAddress = advertisedHost(config);
this.webServiceAddress = webAddress(config);
this.webServiceAddressTls = webAddressTls(config);
this.brokerServiceUrl = brokerUrl(config);
Expand Down Expand Up @@ -565,41 +568,56 @@ public MessagingServiceShutdownHook getShutdownService() {
/**
* Derive the host
*
* @param isBindOnLocalhost
* @return
* @return Hostname or IP address the service binds on.
*/
public static String host(ServiceConfiguration config) {
Copy link
Contributor

@merlimat merlimat Sep 26, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this host(...) method could probably be removed now, given that we can directly use config.getBindAddress() which defaults to a non-null value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@merlimat is it okay to change getHost() to getBindAddress()? Also, would you like me to rebase?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure getBindAddress() sounds good, but we probably don't even need to have the static method, since we could just call config.getBindAddress() as it is.

For rebasing, I will squash it anyway when merging. I don't expect particular conflicts with current master.

try {
if (!config.isBindOnLocalhost()) {
if (config.getBindAddress() == null) {
try {
return InetAddress.getLocalHost().getHostName();
} else {
return "localhost";
} catch (UnknownHostException ex) {
LOG.error(ex.getMessage(), ex);
throw new IllegalStateException("Failed to resolve localhost name.", ex);
}
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw new IllegalStateException("failed to find host", e);
}
return config.getBindAddress();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These 2 methods should have the above logic to check whether bindAddress and advertisedAddress were configured.

}

/**
* Advertised service host.
*
* @return Hostname or IP address the service advertises to the outside world.
*/
public static String advertisedHost(ServiceConfiguration config) {
if (config.getAdvertisedAddress() == null) {
try {
return InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException ex) {
LOG.error(ex.getMessage(), ex);
throw new IllegalStateException("Failed to resolve localhost name.", ex);
}
}
return config.getAdvertisedAddress();
}

public static String brokerUrl(ServiceConfiguration config) {
return "pulsar://" + host(config) + ":" + config.getBrokerServicePort();
return "pulsar://" + advertisedHost(config) + ":" + config.getBrokerServicePort();
}

public static String brokerUrlTls(ServiceConfiguration config) {
if (config.isTlsEnabled()) {
return "pulsar://" + host(config) + ":" + config.getBrokerServicePortTls();
return "pulsar://" + advertisedHost(config) + ":" + config.getBrokerServicePortTls();
} else {
return "";
}
}

public static String webAddress(ServiceConfiguration config) {
return String.format("http://%s:%d", host(config), config.getWebServicePort());
return String.format("http://%s:%d", advertisedHost(config), config.getWebServicePort());
}

public static String webAddressTls(ServiceConfiguration config) {
if (config.isTlsEnabled()) {
return String.format("https://%s:%d", host(config), config.getWebServicePortTls());
return String.format("https://%s:%d", advertisedHost(config), config.getWebServicePortTls());
} else {
return "";
}
Expand All @@ -609,6 +627,10 @@ public String getHost() {
return host;
}

public String getAdvertisedAddress() {
return advertisedAddress;
}

public String getWebServiceAddress() {
return webServiceAddress;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -220,13 +221,13 @@ public void start() throws Exception {

bootstrap.childHandler(new PulsarChannelInitializer(this, serviceConfig, false));
// Bind and start to accept incoming connections.
bootstrap.bind(port).sync();
bootstrap.bind(new InetSocketAddress(pulsar.getHost(), port)).sync();
log.info("Started Pulsar Broker service on port {}", port);

if (serviceConfig.isTlsEnabled()) {
ServerBootstrap tlsBootstrap = bootstrap.clone();
tlsBootstrap.childHandler(new PulsarChannelInitializer(this, serviceConfig, true));
tlsBootstrap.bind(tlsPort).sync();
tlsBootstrap.bind(new InetSocketAddress(pulsar.getHost(), tlsPort)).sync();
log.info("Started Pulsar Broker TLS service on port {}", tlsPort);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public WebService(ServiceConfiguration config, PulsarService pulsar) throws Puls

ServerConnector connector = new PulsarServerConnector(server, 1, 1);
connector.setPort(config.getWebServicePort());
connector.setHost(pulsar.getHost());
connectors.add(connector);

if (config.isTlsEnabled()) {
Expand All @@ -102,6 +103,7 @@ public WebService(ServiceConfiguration config, PulsarService pulsar) throws Puls
sslCtxFactory.setWantClientAuth(true);
ServerConnector tlsConnector = new PulsarServerConnector(server, 1, 1, sslCtxFactory);
tlsConnector.setPort(config.getWebServicePortTls());
tlsConnector.setHost(pulsar.getHost());
connectors.add(tlsConnector);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void setup() throws Exception {
otherconfig.setBrokerServicePort(SECONDARY_BROKER_PORT);
otherconfig.setWebServicePort(SECONDARY_BROKER_WEBSERVICE_PORT);
otherconfig.setLoadBalancerEnabled(false);
otherconfig.setBindOnLocalhost(true);
otherconfig.setBindAddress("localhost");
otherconfig.setClusterName("test");

otherPulsar = startBroker(otherconfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public MockedPulsarServiceBaseTest() {
this.conf.setBrokerServicePortTls(BROKER_PORT_TLS);
this.conf.setWebServicePort(BROKER_WEBSERVICE_PORT);
this.conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS);
this.conf.setBindOnLocalhost(true);
this.conf.setBindAddress("localhost");
this.conf.setClusterName("test");
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.broker.service;

import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.zookeeper.data.Stat;
import org.json.JSONObject;
import org.json.JSONStringer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;

import java.nio.charset.StandardCharsets;
import java.util.Properties;

import static com.yahoo.pulsar.broker.ServiceConfigurationLoader.create;

public class AdvertisedAddressTest {

LocalBookkeeperEnsemble bkEnsemble;
PulsarService pulsar;

private final int ZOOKEEPER_PORT = 12759;
private final int BROKER_WEBSERVICE_PORT = 15782;
private final int BROKER_SERVICE_PORT = 16650;

private final String bindAddress = "127.0.0.1";
private final String advertisedAddress = "pulsar-usc.example.com";

@Before
public void setup() throws Exception {
bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, 5001);
bkEnsemble.start();
ServiceConfiguration config = create(new Properties(System.getProperties()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as we are not loading ServiceConfiguration from map/properties: it is fine to directly instantiate
ServiceConfiguration config = new ServiceConfiguration()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed.

config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
config.setWebServicePort(BROKER_WEBSERVICE_PORT);
config.setClusterName("usc");
config.setBrokerServicePort(BROKER_SERVICE_PORT);
config.setBindAddress(bindAddress);
config.setAdvertisedAddress(advertisedAddress);
config.setManagedLedgerMaxEntriesPerLedger(5);
config.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
pulsar = new PulsarService(config);
pulsar.start();
}

@After
public void shutdown() throws Exception {
pulsar.close();
bkEnsemble.stop();
}

@Test
public void testAdvertisedAddress() throws Exception {
Assert.assertEquals( pulsar.getHost(), bindAddress );
Assert.assertEquals( pulsar.getAdvertisedAddress(), advertisedAddress );
Assert.assertEquals( pulsar.getBrokerServiceUrl(), String.format("pulsar://%s:%d", advertisedAddress, BROKER_SERVICE_PORT) );
Assert.assertEquals( pulsar.getWebServiceAddress(), String.format("http://%s:%d", advertisedAddress, BROKER_WEBSERVICE_PORT) );
String brokerZkPath = String.format("/loadbalance/brokers/%s:%d", bindAddress, BROKER_WEBSERVICE_PORT);
String bkBrokerData = new String(bkEnsemble.getZkClient().getData(brokerZkPath, false, new Stat()), StandardCharsets.UTF_8);
JSONObject jsonBkBrokerData = new JSONObject(bkBrokerData);
Assert.assertEquals( jsonBkBrokerData.get("pulsarServiceUrl"), pulsar.getBrokerServiceUrl() );
Assert.assertEquals( jsonBkBrokerData.get("webServiceUrl"), pulsar.getWebServiceAddress() );
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ void setup() throws Exception {
config.setBrokerServicePort(BROKER_SERVICE_PORT);
config.setAuthorizationEnabled(false);
config.setAuthenticationEnabled(false);
config.setBindOnLocalhost(true);
config.setBindAddress("localhost");
config.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
config.setManagedLedgerMaxEntriesPerLedger(5);
config.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ void setup() throws Exception {
config1.setBrokerServicePurgeInactiveFrequencyInSeconds(
inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
config1.setBrokerServicePort(PortManager.nextFreePort());
config1.setBindOnLocalhost(true);
config1.setBindAddress("localhost");
pulsar1 = new PulsarService(config1);
pulsar1.start();
ns1 = pulsar1.getBrokerService();
Expand All @@ -138,7 +138,7 @@ void setup() throws Exception {
config2.setBrokerServicePurgeInactiveFrequencyInSeconds(
inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
config2.setBrokerServicePort(PortManager.nextFreePort());
config2.setBindOnLocalhost(true);
config2.setBindAddress("localhost");
pulsar2 = new PulsarService(config2);
pulsar2.start();
ns2 = pulsar2.getBrokerService();
Expand All @@ -163,7 +163,7 @@ void setup() throws Exception {
config3.setBrokerServicePurgeInactiveFrequencyInSeconds(
inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
config3.setBrokerServicePort(PortManager.nextFreePort());
config3.setBindOnLocalhost(true);
config3.setBindAddress("localhost");
pulsar3 = new PulsarService(config3);
pulsar3.start();
ns3 = pulsar3.getBrokerService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ brokerServicePort=6650
brokerServicePortTls=6651
webServicePort=8080
webServicePortTls=4443
bindOnLocalhost=false
bindAddress=
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left empty for clarity.

advertisedAddress=
clusterName="test_cluster"
brokerShutdownTimeoutMs=3000
backlogQuotaCheckEnabled=true
Expand Down