-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Move transport decoding and aggregation to server #48263
Move transport decoding and aggregation to server #48263
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the iteration Tim. This looks pretty good to me now! Found a few more small questions/suggestions.
server/src/main/java/org/elasticsearch/transport/TransportDecompressor.java
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/transport/InboundAggregator.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/transport/TransportDecompressor.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Tim. I've left some nits and smaller questions, looking very good though.
server/src/main/java/org/elasticsearch/transport/InboundDecoder.java
Outdated
Show resolved
Hide resolved
final int remaining = composite.length() - bytesConsumed; | ||
if (remaining != 0) { | ||
try (ReleasableBytesReference slice = composite.retainedSlice(bytesConsumed, remaining)) { | ||
final int bytesDecoded = decoder.decode(slice, fragments::add); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we adding to a list here and not instantly passing things on? Does this not mean that we need to decode (and possibly decompress) the full slice before reacting to the request (e.g. invoke a breaker)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're attempting to decode the entire message in a single go since that will be the most common scenario.
The exact path for how messages can be circuit broken will be built out in follow-up work. Compression is going to be kind of an issue no matter what since we have to decompress to know the decompressed size (or put some type of header indicating the final size).
server/src/main/java/org/elasticsearch/transport/InboundAggregator.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/transport/TransportLogger.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good from my end, just one small open question.
cleanDecodeState(); | ||
} | ||
|
||
private void consumeNonEmptyContent(ReleasableBytesReference content, Consumer<Object> fragmentConsumer) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we even need this method now that we fixed the decompression to not produce any empty content? It seems impossible to get here with an empty content
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Currently all of our transport protocol decoding and aggregation occurs in the individual transport modules. This means that each implementation (test, netty, nio) must implement this logic. Additionally, it means that the entire message has been read from the network before the server package receives it. This commit creates a pipeline in server which can be passed arbitrary bytes to handle. Internally, the pipeline will decode, decompress, and aggregate the messages. Additionally, this allows us to run many megabytes of bytes through the pipeline in tests to ensure that the logic works. This work will enable future work: Circuit breaking or backoff logic based on message type and byte in the content aggregator. Sharing bytes with the application layer using the ref counted releasable network bytes. Improved network monitoring based specifically on channels. Finally, this fixes the bug where we do not circuit break on the correct message size when compression is enabled.
* upstream/master: (447 commits) Adjust BWC version on node roles being sorted Deprecate node local storage setting (elastic#54374) Remove Unused BaseNodeRequest (elastic#54323) Decouple environment from DiscoveryNode (elastic#54373) Ensure that the output of node roles are sorted (elastic#54376) Do not stash environment in security (elastic#54372) Do not stash environment in machine learning (elastic#54371) Clean up how pipeline aggs check for multi-bucket (elastic#54161) Remove toString from Painless user tree nodes (elastic#54117) Docs: Use splitOnToken instead of custom function (elastic#48408) bwc: enable script cache in node stats (elastic#54362) bwc: Mute for script cache in node stats (elastic#54359) Test to enforce response to invalid data stream names Scripting: Use randomValueOtherThan in stats test (elastic#54356) Move transport decoding and aggregation to server (elastic#48263) Mute ScriptServiceTests.testContextCacheStats (elastic#54352) Improve checkstyle performance and readability (elastic#54308) Disable Gradle daemon when executing Windows packaging tests (elastic#54316) [Docs] Minor fix for SubmitAsyncSearchRequest.keepOnCompletion javadoc (elastic#54325) [DOCS] Fix typos in top metrics agg docs (elastic#54299) ... Conflicts: server/src/main/java/org/elasticsearch/index/IndexModule.java server/src/main/java/org/elasticsearch/index/IndexService.java server/src/main/java/org/elasticsearch/indices/IndicesService.java server/src/test/java/org/elasticsearch/index/IndexModuleTests.java
This is a follow-up to #48263. It moves the inbound stats tracking inside of the InboundPipeline.
This is a follow-up to elastic#48263. It moves the inbound stats tracking inside of the InboundPipeline.
This is a follow-up to #48263. It moves the inbound stats tracking inside of the InboundPipeline.
Currently all of our transport protocol decoding and aggregation occurs
in the individual transport modules. This means that each implementation
(test, netty, nio) must implement this logic. Additionally, it means
that the entire message has been read from the network before the server
package receives it.
This commit creates a pipeline in server which can be passed arbitrary
bytes to handle. Internally, the pipeline will decode, decompress, and
aggregate the messages. Additionally, this allows us to run many
megabytes of bytes through the pipeline in tests to ensure that the
logic works.
This work will enable future work:
in the content aggregator.
releasable network bytes.
Finally, this fixes the bug where we do not circuit break on the correct
message size when compression is enabled.