-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Simplify Unicast Zen Ping #22277
Simplify Unicast Zen Ping #22277
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks great, much simpler! I left a bunch of comments.
@@ -24,7 +24,6 @@ | |||
import org.elasticsearch.cluster.ClusterState; | |||
import org.elasticsearch.cluster.node.DiscoveryNode; | |||
import org.elasticsearch.common.component.AbstractComponent; | |||
import org.elasticsearch.common.inject.Inject; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💥
unicastZenPingExecutorService, | ||
logger, | ||
configuredHosts, | ||
limitPortCounts, | ||
transportService, | ||
() -> UNICAST_NODE_PREFIX + unicastNodeIdGenerator.incrementAndGet() + "#", | ||
UNICAST_NODE_PREFIX, | ||
resolveTimeout); | ||
} catch (InterruptedException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please restore the interrupt status here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is how it was and we do throw an exception, thus processing the interrupt?
final AbstractRunnable pingSender = new AbstractRunnable() { | ||
@Override | ||
public void onFailure(Exception e) { | ||
if (e instanceof AlreadyClosedException) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe (e instanceof AlreadyClosedException) == false
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep. This morphed - I used to have a log there but got annoyed with it (just noise).
public PingCollection pingCollection() { | ||
return pingCollection; | ||
public List<DiscoveryNode> getSeedNodes() { | ||
checkIfClosed(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we call it ensureOpen
everywhere can we do the same here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good one. Will change.
} | ||
} | ||
|
||
public synchronized Connection addConnectionIfNeeded(TransportAddress address, Connection newConnection) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm that looks weird. Can we maybe use a KeyedLock
when we open the connections with IP and port or something like this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I wanted to have the simplest construct as it was a rare collision. With the latest code I actually think it's impossible (I dedup on addresses and the connection are private to the pinging round). Will remove.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
turns out we do need this protection or something similar. I took another approach, which I think you'd like better.
} | ||
|
||
if (connection == null) { | ||
logger.trace("[{}] connecting (light) to {}", pingingRound.id(), node); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need this trace log here and if so can we fix it to say temporarily
or something like this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I adapted the log message
} finally { | ||
latch.countDown(); | ||
logger.trace("[{}] received response from {}: {}", pingingRound.id(), node, Arrays.toString(response.pingResponses)); | ||
if (pingingRound.isClosed() == false) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just flip it then you don't need to negate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
} | ||
} finally { | ||
latch.countDown(); | ||
logger.trace("[{}] received response from {}: {}", pingingRound.id(), node, Arrays.toString(response.pingResponses)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you keep the trace maybe use a logging guard here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure thing, will add.
*/ | ||
public static ConnectionProfile getLightProfileWithTimeout(@Nullable TimeValue connectTimeout, | ||
@Nullable TimeValue handshakeTimeout) { | ||
return new ConnectionProfile( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should do this. I think we should move the LIGHT_PROFILE
into tests somewhere and then require every special use to build it's own. The problem I have here is that the getLightProfileWithTimeout
shares one connection across all uses. I think in the case of ping we should only use 1 connection for PING
and 0 for the others. that will cause an exception if it's used in a wrong context. makes sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to implement your suggestion and I think it looks good. will push shortly.
* @throws ConnectTransportException if the connection failed | ||
* @throws IllegalStateException if the handshake failed | ||
*/ | ||
public DiscoveryNode connectToNodeAndHandshake( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++
@s1monw I pushed more commits addressing your feedback. Let me know what you think. |
import static org.hamcrest.Matchers.hasSize; | ||
import static org.mockito.Matchers.eq; | ||
import static org.mockito.Mockito.mock; | ||
import static org.mockito.Mockito.verify; | ||
import static org.mockito.Mockito.verifyNoMoreInteractions; | ||
|
||
@TestLogging("org.elasticsearch.transport:TRACE,org.elasticsearch.discovery.zen:TRACE") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logging was initially added to just testSimplePings
to chase a race. The race has not reproduced since adding this logging. I think that we should drop the logging and and then address if the race comes back since you've changed how these things are handled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left some minors LGTM otherwise
public Connection getOrConnect(DiscoveryNode node) throws IOException { | ||
Connection result; | ||
try (Releasable ignore = connectionLock.acquire(node.getAddress())) { | ||
result = tempConnections.get(node.getAddress()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe use computeIfAbsent()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the problem is the IOException that can be thrown while making a connection.
@@ -447,7 +460,7 @@ protected void sendPings(final TimeValue timeout, final PingingRound pingingRoun | |||
// dedup by address | |||
final Map<TransportAddress, DiscoveryNode> uniqueNodesByAddress = | |||
Stream.concat(pingingRound.getSeedNodes().stream(), nodesFromResponses.stream()) | |||
.collect(Collectors.toMap(DiscoveryNode::getAddress, n -> n, (n1, n2) -> n1)); | |||
.collect(Collectors.toMap(DiscoveryNode::getAddress, node -> node, (n1, n2) -> n1)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you didn't like Function.identity()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did but I used it wrong (as a function reference). Using it right works of course .. zzzz
} else { | ||
logger.trace("[{}] skipping received response from {}. already closed", pingingRound.id(), node); | ||
Arrays.asList(response.pingResponses).forEach(pingingRound::addPingResponseToCollection); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Arrays.asStream(response.pingResponses)
would not materialize it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
43f8287
to
8959ae6
Compare
thx @s1monw. I'll wait a day before backporting |
* master: Simplify Unicast Zen Ping (elastic#22277) Replace IndicesQueriesRegistry (elastic#22289) Fixed document mistake and fit for 5.1.1 API [TEST] improve error message in ESTestCase#assertWarnings [TEST] remove deleted test classes from checkstyle suppressions [TEST] make ESSingleNodeTestCase tests repeatable (elastic#22283) Link for setting page in elasticsearch.yml is outdated Factor out sort values from InternalSearchHit (elastic#22080) Add ID for percolate query to Java API docs x_refresh.yaml tests should use unique index names and doc ids to ease debugging IndicesStoreIntegrationIT should not use start recovery sending as an indication that the recovery started Added base class for testing aggregators and some initial tests for `terms`, `top_hits` and `min` aggregations. Add link to foreach processor to ingest-attachment.asciidoc
…r being closed This may cause them to leak. Provisioning for it was made in #22277 but sadly a crucial ensureOpen call was forgotten
…otification task Not doing this made it difficult to establish a happens before relationship between connecting to a node and adding a listeners. Causing test code like this to fail sproadically: ``` // connection to reuse handleA.transportService.connectToNode(handleB.node); // install a listener to check that no new connections are made handleA.transportService.addConnectionListener(new TransportConnectionListener() { @OverRide public void onConnectionOpened(DiscoveryNode node) { fail("should not open any connections. got [" + node + "]"); } }); ``` relates to #22277
The `UnicastZenPing` shows it's age and is the result of many small changes. The current state of affairs is confusing and is hard to reason about. This PR cleans it up (while following the same original intentions). Highlights of the changes are: 1) Clear 3 round flow - no interleaving of scheduling. 2) The previous implementation did a best effort attempt to wait for ongoing pings to be sent and completed. The pings were guaranteed to complete because each used the total ping duration as a timeout. This did make it hard to reason about the total ping duration and the flow of the code. All of this is removed now and ping should just complete within the given duration or not be counted (note that it was very handy for testing, but I move the needed sync logic to the test). 3) Because of (2) the pinging scheduling changed a bit, to give a chance for the last round to complete. We now ping at the beginning, 1/3 and 2/3 of the duration. 4) To offset for (3) a bit, incoming ping requests are now added to on going ping collections. 5) UnicastZenPing never establishes full blown connections (but does reuse them if there). Relates to #22120 6) Discovery host providers are only used once per pinging round. Closes #21739 7) Usage of the ability to open a connection without connecting to a node ( #22194 ) and shorter connection timeouts helps with connections piling up. Closes #19370 8) Beefed up testing and sped them up. 9) removed light profile from production code
…r being closed This may cause them to leak. Provisioning for it was made in #22277 but sadly a crucial ensureOpen call was forgotten
…otification task Not doing this made it difficult to establish a happens before relationship between connecting to a node and adding a listeners. Causing test code like this to fail sproadically: ``` // connection to reuse handleA.transportService.connectToNode(handleB.node); // install a listener to check that no new connections are made handleA.transportService.addConnectionListener(new TransportConnectionListener() { @OverRide public void onConnectionOpened(DiscoveryNode node) { fail("should not open any connections. got [" + node + "]"); } }); ``` relates to #22277
this is now backported to 5.x as well. |
The
UnicastZenPing
shows it's age and is the result of many small changes. The current state of affairs is confusing and is hard to reason about. This PR cleans it up (while following the same original intentions). Highlights of the changes are: