Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JAVA-3131: Add #retrieve method to EndPoint for when caller does not … #1735

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ protected GssApiAuthenticator(
SUPPORTED_MECHANISMS,
options.getAuthorizationId(),
protocol,
((InetSocketAddress) endPoint.resolve()).getAddress().getCanonicalHostName(),
((InetSocketAddress) endPoint.retrieve()).getAddress().getCanonicalHostName(),
options.getSaslProperties(),
null);
} catch (LoginException | SaslException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ private Map<String, SessionStateForNode> getConnectedNodes() {
return pools.entrySet().stream()
.collect(
Collectors.toMap(
entry -> AddressFormatter.nullSafeToString(entry.getKey().getEndPoint().resolve()),
entry -> AddressFormatter.nullSafeToString(entry.getKey().getEndPoint().retrieve()),
this::constructSessionStateForNode));
}

Expand All @@ -315,7 +315,7 @@ private InsightsStartupData createStartupData() {
.withContactPoints(
getResolvedContactPoints(
driverContext.getMetadataManager().getContactPoints().stream()
.map(n -> n.getEndPoint().resolve())
.map(n -> n.getEndPoint().retrieve())
.filter(InetSocketAddress.class::isInstance)
.map(InetSocketAddress.class::cast)
.collect(Collectors.toSet())))
Expand Down Expand Up @@ -456,7 +456,7 @@ private PoolSizeByHostDistance getPoolSizeByHostDistance() {
}

private String getControlConnectionSocketAddress() {
SocketAddress controlConnectionAddress = controlConnection.channel().getEndPoint().resolve();
SocketAddress controlConnectionAddress = controlConnection.channel().getEndPoint().retrieve();
return AddressFormatter.nullSafeToString(controlConnectionAddress);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,17 @@ public interface EndPoint {
@NonNull
SocketAddress resolve();

/**
* Returns a possibly unresolved instance to a socket address.
*
* <p>This should be called when the address does not need to be proactively resolved. For example
* if the node hostname or port number is needed.
*/
@NonNull
default SocketAddress retrieve() {
return resolve();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if it makes sense to provide a default implementation for the new method. It's unlikely we'll be adding a new endpoint type anytime soon but failing to distinguish between "look up the addresses again" and "give me the addresses you got last time you looked them up" is what got us into this situation in the first place. I kinda feel like having this as a default might lend itself to landing back in exactly that situation.


/**
* Returns an alternate string representation for use in node-level metric names.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public ProgrammaticSslEngineFactory(
@Override
public SSLEngine newSslEngine(@NonNull EndPoint remoteEndpoint) {
SSLEngine engine;
SocketAddress remoteAddress = remoteEndpoint.resolve();
SocketAddress remoteAddress = remoteEndpoint.retrieve();
if (remoteAddress instanceof InetSocketAddress) {
InetSocketAddress socketAddress = (InetSocketAddress) remoteAddress;
engine = sslContext.createSSLEngine(socketAddress.getHostName(), socketAddress.getPort());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ public DefaultEndPoint(InetSocketAddress address) {
@NonNull
@Override
public InetSocketAddress resolve() {
return retrieve();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: save yourself the method call overhead here and just return address directly... ?

}

@Override
public InetSocketAddress retrieve() {
return address;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ private Optional<NodeInfo> findInPeers(
// We save it the first time we get a control connection channel.
private void savePort(DriverChannel channel) {
if (port < 0) {
SocketAddress address = channel.getEndPoint().resolve();
SocketAddress address = channel.getEndPoint().retrieve();
if (address instanceof InetSocketAddress) {
port = ((InetSocketAddress) address).getPort();
}
Expand Down Expand Up @@ -518,7 +518,7 @@ protected InetSocketAddress getBroadcastRpcAddress(
}
InetSocketAddress broadcastRpcAddress =
new InetSocketAddress(broadcastRpcInetAddress, broadcastRpcPort);
if (row.contains("peer") && broadcastRpcAddress.equals(localEndPoint.resolve())) {
if (row.contains("peer") && broadcastRpcAddress.equals(localEndPoint.retrieve())) {
// JAVA-2303: if the peer is actually the control node, ignore that peer as it is likely
// a misconfiguration problem.
LOG.warn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.datastax.oss.driver.internal.core.metadata;

import com.datastax.oss.driver.api.core.metadata.EndPoint;
import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
import com.datastax.oss.driver.shaded.guava.common.primitives.UnsignedBytes;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.net.InetAddress;
Expand All @@ -26,10 +27,14 @@
import java.util.Arrays;
import java.util.Comparator;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;

public class SniEndPoint implements EndPoint {
private static final AtomicLong OFFSET = new AtomicLong();
// initialize offset to random position to avoid all clients starting at the same index
@VisibleForTesting
static final AtomicInteger OFFSET =
new AtomicInteger(ThreadLocalRandom.current().nextInt(0, 1024));

private final InetSocketAddress proxyAddress;
private final String serverName;
Expand All @@ -55,7 +60,7 @@ public String getServerName() {
@Override
public InetSocketAddress resolve() {
try {
InetAddress[] aRecords = InetAddress.getAllByName(proxyAddress.getHostName());
InetAddress[] aRecords = resolveARecords();
if (aRecords.length == 0) {
// Probably never happens, but the JDK docs don't explicitly say so
throw new IllegalArgumentException(
Expand All @@ -64,14 +69,32 @@ public InetSocketAddress resolve() {
// The order of the returned address is unspecified. Sort by IP to make sure we get a true
// round-robin
Arrays.sort(aRecords, IP_COMPARATOR);
int index = (aRecords.length == 1) ? 0 : (int) OFFSET.getAndIncrement() % aRecords.length;
return new InetSocketAddress(aRecords[index], proxyAddress.getPort());

// get next offset value, reset OFFSET if wrapped around to negative
int nextOffset = OFFSET.getAndIncrement();
if (nextOffset < 0) {
// if negative set OFFSET to 1 and nextOffset to 0, else simulate getAndIncrement()
nextOffset = OFFSET.updateAndGet(v -> v < 0 ? 1 : v + 1) - 1;
}

return new InetSocketAddress(aRecords[nextOffset % aRecords.length], proxyAddress.getPort());
} catch (UnknownHostException e) {
throw new IllegalArgumentException(
"Could not resolve proxy address " + proxyAddress.getHostName(), e);
}
}

@VisibleForTesting
InetAddress[] resolveARecords() throws UnknownHostException {
// moving static call to method to allow mocking in tests
return InetAddress.getAllByName(proxyAddress.getHostName());
}

@Override
public InetSocketAddress retrieve() {
return proxyAddress;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we want to cache the last value returned by resolve() and return that here instead?

}

@Override
public boolean equals(Object other) {
if (other == this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public DefaultSslEngineFactory(DriverContext driverContext) {
@Override
public SSLEngine newSslEngine(@NonNull EndPoint remoteEndpoint) {
SSLEngine engine;
SocketAddress remoteAddress = remoteEndpoint.resolve();
SocketAddress remoteAddress = remoteEndpoint.retrieve();
if (remoteAddress instanceof InetSocketAddress) {
InetSocketAddress socketAddress = (InetSocketAddress) remoteAddress;
engine = sslContext.createSSLEngine(socketAddress.getHostName(), socketAddress.getPort());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public SSLEngine newSslEngine(@NonNull EndPoint remoteEndpoint) {
this.getClass().getSimpleName()));
}
SniEndPoint sniEndPoint = (SniEndPoint) remoteEndpoint;
InetSocketAddress address = sniEndPoint.resolve();
InetSocketAddress address = sniEndPoint.retrieve();
String sniServerName = sniEndPoint.getServerName();

// When hostname verification is enabled (with setEndpointIdentificationAlgorithm), the SSL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ private DefaultDriverContext mockDefaultDriverContext() throws UnknownHostExcept
when(context.getProtocolVersion()).thenReturn(DSE_V2);
DefaultNode contactPoint = mock(DefaultNode.class);
EndPoint contactEndPoint = mock(EndPoint.class);
when(contactEndPoint.resolve()).thenReturn(new InetSocketAddress("127.0.0.1", 9999));
when(contactEndPoint.retrieve()).thenReturn(new InetSocketAddress("127.0.0.1", 9999));
when(contactPoint.getEndPoint()).thenReturn(contactEndPoint);
when(manager.getContactPoints()).thenReturn(ImmutableSet.of(contactPoint));

Expand All @@ -501,7 +501,7 @@ private DefaultDriverContext mockDefaultDriverContext() throws UnknownHostExcept
ControlConnection controlConnection = mock(ControlConnection.class);
DriverChannel channel = mock(DriverChannel.class);
EndPoint controlConnectionEndpoint = mock(EndPoint.class);
when(controlConnectionEndpoint.resolve()).thenReturn(new InetSocketAddress("127.0.0.1", 10));
when(controlConnectionEndpoint.retrieve()).thenReturn(new InetSocketAddress("127.0.0.1", 10));

when(channel.getEndPoint()).thenReturn(controlConnectionEndpoint);
when(channel.localAddress()).thenReturn(new InetSocketAddress("127.0.0.1", 10));
Expand All @@ -513,15 +513,15 @@ private DefaultDriverContext mockDefaultDriverContext() throws UnknownHostExcept
private void mockConnectionPools(DefaultDriverContext driverContext) {
Node node1 = mock(Node.class);
EndPoint endPoint1 = mock(EndPoint.class);
when(endPoint1.resolve()).thenReturn(new InetSocketAddress("127.0.0.1", 10));
when(endPoint1.retrieve()).thenReturn(new InetSocketAddress("127.0.0.1", 10));
when(node1.getEndPoint()).thenReturn(endPoint1);
when(node1.getOpenConnections()).thenReturn(1);
ChannelPool channelPool1 = mock(ChannelPool.class);
when(channelPool1.getInFlight()).thenReturn(10);

Node node2 = mock(Node.class);
EndPoint endPoint2 = mock(EndPoint.class);
when(endPoint2.resolve()).thenReturn(new InetSocketAddress("127.0.0.1", 20));
when(endPoint2.retrieve()).thenReturn(new InetSocketAddress("127.0.0.1", 20));
when(node2.getEndPoint()).thenReturn(endPoint2);
when(node2.getOpenConnections()).thenReturn(2);
ChannelPool channelPool2 = mock(ChannelPool.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public void should_add_existing_node_with_same_id_but_different_endpoint() {
new DefaultMetadata(
ImmutableMap.of(node1.getHostId(), node1), Collections.emptyMap(), null, null);
DefaultEndPoint newEndPoint = TestNodeFactory.newEndPoint(2);
InetSocketAddress newBroadcastRpcAddress = newEndPoint.resolve();
InetSocketAddress newBroadcastRpcAddress = newEndPoint.retrieve();
UUID newSchemaVersion = Uuids.random();
DefaultNodeInfo newNodeInfo =
DefaultNodeInfo.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ public void should_refresh_node_list_from_local_and_peers() {
assertThat(info1.getEndPoint()).isEqualTo(node1.getEndPoint());
assertThat(info1.getDatacenter()).isEqualTo("dc1");
NodeInfo info3 = iterator.next();
assertThat(info3.getEndPoint().resolve())
assertThat(info3.getEndPoint().retrieve())
.isEqualTo(new InetSocketAddress("127.0.0.3", 9042));
assertThat(info3.getDatacenter()).isEqualTo("dc3");
NodeInfo info2 = iterator.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ public void should_ignore_node_refresh_if_topology_monitor_does_not_have_info()
@Test
public void should_add_node() {
// Given
InetSocketAddress broadcastRpcAddress = ((InetSocketAddress) END_POINT2.resolve());
InetSocketAddress broadcastRpcAddress = ((InetSocketAddress) END_POINT2.retrieve());
NodeInfo info = mock(NodeInfo.class);
when(info.getBroadcastRpcAddress()).thenReturn(Optional.of(broadcastRpcAddress));
when(topologyMonitor.getNewNodeInfo(broadcastRpcAddress))
Expand All @@ -238,8 +238,8 @@ public void should_add_node() {
@Test
public void should_not_add_node_if_broadcast_rpc_address_does_not_match() {
// Given
InetSocketAddress broadcastRpcAddress2 = ((InetSocketAddress) END_POINT2.resolve());
InetSocketAddress broadcastRpcAddress3 = ((InetSocketAddress) END_POINT3.resolve());
InetSocketAddress broadcastRpcAddress2 = ((InetSocketAddress) END_POINT2.retrieve());
InetSocketAddress broadcastRpcAddress3 = ((InetSocketAddress) END_POINT3.retrieve());
NodeInfo info = mock(NodeInfo.class);
when(topologyMonitor.getNewNodeInfo(broadcastRpcAddress2))
.thenReturn(CompletableFuture.completedFuture(Optional.of(info)));
Expand All @@ -259,7 +259,7 @@ public void should_not_add_node_if_broadcast_rpc_address_does_not_match() {
@Test
public void should_not_add_node_if_topology_monitor_does_not_have_info() {
// Given
InetSocketAddress broadcastRpcAddress2 = ((InetSocketAddress) END_POINT2.resolve());
InetSocketAddress broadcastRpcAddress2 = ((InetSocketAddress) END_POINT2.retrieve());
when(topologyMonitor.getNewNodeInfo(broadcastRpcAddress2))
.thenReturn(CompletableFuture.completedFuture(Optional.empty()));

Expand All @@ -274,7 +274,7 @@ public void should_not_add_node_if_topology_monitor_does_not_have_info() {
@Test
public void should_remove_node() {
// Given
InetSocketAddress broadcastRpcAddress2 = ((InetSocketAddress) END_POINT2.resolve());
InetSocketAddress broadcastRpcAddress2 = ((InetSocketAddress) END_POINT2.retrieve());

// When
metadataManager.removeNode(broadcastRpcAddress2);
Expand Down
Loading