-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove client connections from TcpTransport #31886
Conversation
Pinging @elastic/es-core-infra |
Hey @s1monw This is the result of some work that I've played around with over the last year. I know that you are assigned to the issue, so you might have your own plan. But I had created a number of branches based on this work at one point or another, so I thought I would push it up here. It is pretty rough currently. There are tests that have not been updated (due to changes in how mock transports work). Additionally, the TcpTransport still holds the connection profiles. That could be resolved in this PR or another. Essentially, we do not need to merge this. I just wanted to push it up here to show my thoughts on that issue. I see the options as:
Long term my view would be that you could have different connection managers for different pieces of functionality (normal cluster connections, ccs, other cross cluster stuff, etc). The connection manager could be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like it. I left some comments. thanks for taking this on.
@@ -28,6 +28,32 @@ | |||
*/ | |||
public interface TransportConnectionListener { | |||
|
|||
interface NodeConnection { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think these sub-interfaces are unnecessary. lets just stick with the top level one
@@ -154,6 +123,19 @@ default CircuitBreaker getInFlightRequestBreaker() { | |||
void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) throws | |||
IOException, TransportException; | |||
|
|||
default boolean supportsPing() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets just return a boolean from sendPing and remove this method?
throw new UnsupportedOperationException("Not support by this connection type"); | ||
} | ||
|
||
default boolean isClosed() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see this used or implemented, maybe it's not necessary?
|
||
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; | ||
|
||
public class ConnectionManager implements Closeable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add some javadocs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would also love to see a unittest for this one. that would be awesome!
out.writeInt(TcpTransport.PING_DATA_SIZE); | ||
pingMessage = out.bytes(); | ||
} catch (IOException e) { | ||
throw new IllegalStateException(e.getMessage(), e); // won't happen |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's throw an assertion error instead. I konw it's preexisting.
@Override | ||
public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException { | ||
return getConnection(node); | ||
// @Override |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can go?
connectedNodes.remove(node); | ||
listener.onNodeDisconnected(node); | ||
} | ||
// @Override |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can go as well?
public boolean nodeConnected(DiscoveryNode node) { | ||
return node.equals(connectedNode); | ||
} | ||
// @Override |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove?
@s1monw - Alright. I will work on getting this PR production ready. As you will have noticed (from CI and commented out stuff), there is still some work to do on our whole "mocking network" constructs for tests to pass. |
@tbrooks8 thanks man, assign me as a reviewer again once you are ready |
@s1monw - I think this is ready for another view. The moving the connection profile out of the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left some minors. This is a great step forward. Lets move on! LGTM
} | ||
|
||
@Override | ||
public void close() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you should protect from double closing here.
return false; | ||
} | ||
|
||
void addCloseListener(ActionListener<Void> listener); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you explain in what case the ActinListener is called and what method of it?
Jenkins, run sample packaging tests |
@s1monw - I merged this. I started working on the back port to get far enough to know that it is going to be a little tricky. It will require the back port of some other PRs, etc. And I can do that, before I went through the process I wanted to check if this is something we want for 6.x? The issue (#31835) does not have targeted versions. |
…e-types * elastic/master: (199 commits) Watcher: Remove unused hipchat render method (elastic#32211) Watcher: Remove extraneous auth classes (elastic#32300) Watcher: migrate PagerDuty v1 events API to v2 API (elastic#32285) [TEST] Select free port for Minio (elastic#32837) MINOR: Remove `IndexTemplateFilter` (elastic#32841) Core: Add java time version of rounding classes (elastic#32641) Aggregations/HL Rest client fix: missing scores (elastic#32774) HLRC: Add Delete License API (elastic#32586) INGEST: Create Index Before Pipeline Execute (elastic#32786) Fix NOOP bulk updates (elastic#32819) Remove client connections from TcpTransport (elastic#31886) Increase logging testRetentionPolicyChangeDuringRecovery AwaitsFix case-functions.sql-spec Mute security-cli tests in FIPS JVM (elastic#32812) SCRIPTING: Support BucketAggScript return null (elastic#32811) Unmute WildFly tests in FIPS JVM (elastic#32814) [TEST] Force a stop to save rollup state before continuing (elastic#32787) [test] disable packaging tests for suse boxes Mute IndicesRequestIT#testBulk [ML][DOCS] Refer to rules feature as custom rules (elastic#32785) ...
…listeners * elastic/master: Watcher: Remove unused hipchat render method (elastic#32211) Watcher: Remove extraneous auth classes (elastic#32300) Watcher: migrate PagerDuty v1 events API to v2 API (elastic#32285) [TEST] Select free port for Minio (elastic#32837) MINOR: Remove `IndexTemplateFilter` (elastic#32841) Core: Add java time version of rounding classes (elastic#32641) Aggregations/HL Rest client fix: missing scores (elastic#32774) HLRC: Add Delete License API (elastic#32586) INGEST: Create Index Before Pipeline Execute (elastic#32786) Fix NOOP bulk updates (elastic#32819) Remove client connections from TcpTransport (elastic#31886) Increase logging testRetentionPolicyChangeDuringRecovery AwaitsFix case-functions.sql-spec Mute security-cli tests in FIPS JVM (elastic#32812) SCRIPTING: Support BucketAggScript return null (elastic#32811) Unmute WildFly tests in FIPS JVM (elastic#32814) [TEST] Force a stop to save rollup state before continuing (elastic#32787) [test] disable packaging tests for suse boxes Mute IndicesRequestIT#testBulk [ML][DOCS] Refer to rules feature as custom rules (elastic#32785)
@tbrooks8 I think we can stick with 7.x for this. WDYT? |
This is related to elastic#31835. This commit adds a connection manager that manages client connections to other nodes. This means that the TcpTransport no longer maintains a map of nodes that it is connected to.
This is a followup to #31886. After that commit the TransportConnectionListener had to be propogated to both the Transport and the ConnectionManager. This commit moves that listener to completely live in the ConnectionManager. The request and response related methods are moved to a TransportMessageListener. That listener continues to live in the Transport class.
StubbableConnectionManager connectionManager = new StubbableConnectionManager(new ConnectionManager(settings, this, threadPool), | ||
settings, this, threadPool); | ||
connectionManager.setDefaultNodeConnectedBehavior((cm, discoveryNode) -> true); | ||
connectionManager.setDefaultConnectBehavior((cm, discoveryNode) -> new Connection() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bit late to the party here, but could this have been the following?
connectionManager.setDefaultConnectBehavior((cm, discoveryNode) -> openConnection(discoveryNode, null));
That'd let me continue to override the default connection behaviour by subclassing CapturingTransport
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think that would work. Feel free to make that change in a PR where you need to override the method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great. I opened #33012.
This is a followup to #31886. After that commit the TransportConnectionListener had to be propogated to both the Transport and the ConnectionManager. This commit moves that listener to completely live in the ConnectionManager. The request and response related methods are moved to a TransportMessageListener. That listener continues to live in the Transport class.
This is related to #31835. This commit adds a connection manager that
manages client connections to other nodes. This means that the
TcpTransport no longer maintains a map of nodes that it is connected
to.