-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove connectToNodeLight and replace it with a connection profile #21799
Conversation
The Transport#connectToNodeLight concepts is confusing and not very flexible. neither really testable on a unittest level. This commit cleans up the code used to connect to nodes and simplifies transport implemenations to share more code. This also allows to connect to nodes with custom profiles if needed, for instance future improvements can be added to connect to/from nodes that are non-data nodes without dedicated bulks and recovery connections.
test this please |
for (int i = 0; i < channels.length; i++) { | ||
assert iterator.hasNext(); | ||
ChannelFuture future = iterator.next(); | ||
future.awaitUninterruptibly((long) (connectTimeout.millis() * 1.5)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it could be nice to have a connection timeout configurable by ConnectionProfile
? Like not using 30s timeouts when we ping nodes every 3s (see #19719 (comment))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea, very good idea. I will create a followup for this WDYT @tlrx
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tlrx I looked into this and one problem is that the connect timeout is per Bootstrap on the netty layer. This essentially means we need a dedicated Bootstrap for each connection timeout value. I don't think we should do that, maybe there is a better solution to this? I wonder if we should look into adding this option to netty?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tlrx scratch that, I think I found a way to make this work, it's a bit involved so it will definitely a sep. PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@s1monw yes
test this please |
as a followup I'd like to do stuff like this: diff --git a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java
index 66dd217..e0c1d3a 100644
--- a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java
+++ b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java
@@ -81,6 +81,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumMap;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -205,9 +206,20 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK);
builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING);
- builder.addConnections(connectionsPerNodeRecovery, TransportRequestOptions.Type.RECOVERY);
- builder.addConnections(connectionsPerNodeReg, TransportRequestOptions.Type.REG);
- builder.addConnections(connectionsPerNodeState, TransportRequestOptions.Type.STATE);
+ Set<TransportRequestOptions.Type> regSet = EnumSet.of(TransportRequestOptions.Type.REG);
+ if (DiscoveryNode.isMasterNode(settings)) {
+ builder.addConnections(connectionsPerNodeState, TransportRequestOptions.Type.STATE);
+ } else {
+ // if we are not master eligible we don't need a dedicated channel to publish the state
+ regSet.add(TransportRequestOptions.Type.STATE);
+ }
+ if (DiscoveryNode.isDataNode(settings)) {
+ builder.addConnections(connectionsPerNodeRecovery, TransportRequestOptions.Type.RECOVERY);
+ } else {
+ // if we are not a data-node we don't need any dedicated channels for recovery
+ regSet.add(TransportRequestOptions.Type.RECOVERY);
+ }
+ builder.addConnections(connectionsPerNodeReg, regSet.toArray(new TransportRequestOptions.Type[regSet.size()]));
defaultConnectionProfile = builder.build();
}
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like it @s1monw. I left a few comments.
* @see #connectToChannelsLight(DiscoveryNode) | ||
*/ | ||
protected abstract NodeChannels connectToChannels(DiscoveryNode node) throws IOException; | ||
protected abstract NodeChannels connectToChannels(DiscoveryNode node, ConnectionProfile handles) throws IOException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: handles
-> connectionProfile
.
TransportRequestOptions.Type.STATE)), 1); | ||
|
||
private final List<ConnectionTypeHandle> handles; | ||
private final int numConnection; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: numConnection
-> numConnections
.
/** | ||
* Returns the total number of connections for this profile | ||
*/ | ||
public int getNumConnection() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: getNumConnection
-> getNumConnections
.
} | ||
for (TransportRequestOptions.Type type : types) { | ||
if (addedTypes.contains(type)) { | ||
throw new IllegalArgumentException("type [" + type + "] is already registered"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that this should be an IllegalStateException
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my reasoning is that the argument would introduce this state but I won't let it hence the ISE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay.
offset += numConnections; | ||
} | ||
|
||
public ConnectionProfile build() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a Javadoc for this documenting that all types have to have been added?
@jasontedor I pushed a new commit and replied to one comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left one more comment @s1monw.
@@ -79,6 +79,10 @@ public void addConnections(int numConnections, TransportRequestOptions.Type... t | |||
offset += numConnections; | |||
} | |||
|
|||
/** | |||
* Creates a new {@link ConnectionProfile} based on the added connections. | |||
* @throws IllegalArgumentException if any of the {@link org.elasticsearch.transport.TransportRequestOptions.Type} enum is missing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This Javadoc is incorrect; the method throws an IllegalStateException
(which in this case I think is correct). 😄
@jasontedor fixed the javadocs issue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
…21799) The Transport#connectToNodeLight concepts is confusing and not very flexible. neither really testable on a unittest level. This commit cleans up the code used to connect to nodes and simplifies transport implementations to share more code. This also allows to connect to nodes with custom profiles if needed, for instance future improvements can be added to connect to/from nodes that are non-data nodes without dedicated bulks and recovery connections.
The Transport#connectToNodeLight concepts is confusing and not very flexible.
neither really testable on a unittest level. This commit cleans up the code used
to connect to nodes and simplifies transport implementations to share more code.
This also allows to connect to nodes with custom profiles if needed, for instance
future improvements can be added to connect to/from nodes that are non-data nodes without
dedicated bulks and recovery connections.