-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reduce resource needs of join validation #85380
Reduce resource needs of join validation #85380
Conversation
Hi @DaveCTurner, I've created a changelog YAML for you. |
Fixes a few scalability issues around join validation: - compresses the cluster state sent over the wire - shares the serialized cluster state across multiple nodes - forks the decompression/deserialization work off the transport thread Relates elastic#77466 Closes elastic#83204
40ba7c4
to
387ded0
Compare
Still needs work on the tests (see AwaitsFix annotations) but I'm opening this to get some CI attention. I doubt I'll get this in for 8.2. |
…alidation-service
Pinging @elastic/es-distributed (Team:Distributed) |
Think this is good to go now. Note that it changes the semantics of join validation slightly: we obviously don't send the latest state each time any more, it might be up to 60s stale. Also we used to be certain that the sender was marked as the master in the state we sent, but that's no longer the case since we get the state twice. I don't think these are meaningful changes, but worth pointing out anyway. |
return ClusterState.readFrom(input, null); | ||
} | ||
} finally { | ||
IOUtils.close(in); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe in
might be already closed by line try (StreamInput input = in)
in case when no exception. I wonder if it is possible to simplify code to have a single try with resources instead nested one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} | ||
|
||
public ClusterState getState() throws IOException { | ||
return stateSupplier.get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this be called multiple times?
If so then input would be read more then once. Should we consider LazyInitializable
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is only called once so I think this is fine. I renamed it in fd908df to getOrReadState
just to make it a bit clearer that this might be more than a simple getter.
} finally { | ||
if (success == false) { | ||
assert false; | ||
bytesStream.close(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this be converted to an unconditional catch or am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better to close resources in a finally
like this. For instance this will close the stream in tests even if an assertion trips, which we need to do to avoid tripping the leak detector later on. To get the same effect with a catch block we'd need to catch (Throwable t)
which is not permitted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(on closer inspection I see that on an assertion error we would leak the rest of the queue anyway, see 3af680b)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice one! Thanks David, sorry this took me so long to get to.
Just a couple of random comments. The doc one is the only one that is somewhat important I think :)
stream.setVersion(version); | ||
clusterState.writeTo(stream); | ||
} catch (IOException e) { | ||
throw new ElasticsearchException("failed to serialize cluster state for publishing to node {}", e, discoveryNode); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be an assert false;
? We should never have a state that we fail to serialize to some version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mmm possibly, especially with an IOException
, but in general exceptions during serialization are the last line of defence if there's some new feature that simply can't be represented in an older version. It's definitely better to avoid using new features in mixed clusters where possible but it's not a hard constraint.
super.writeTo(out); | ||
this.state.writeTo(out); | ||
stateSupplier.get().writeTo(out); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a comment, this change is still a winner regardless :) :
This is such an unfortunate pattern that we have in a couple of spots. Instead of copying the bytes we should really be able to just use them outright by incrementing ref count like we do for BytesTransportRequest
during publication.
This effectively doubles the memory use relative what we actually need for the full state compressed on heap and still scales in O(N) in the number of nodes we work with.
I'll see if we can clean this up in some neat way now that we only have Netty to worry about :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NB assert out.getVersion().before(Version.V_8_3_0);
we only call this if the target node doesn't support the new wire format, i.e. the cluster is in the middle of a rolling upgrade. I don't think it's worth optimising this case.
public String toString() { | ||
return cacheClearer + " after timeout"; | ||
} | ||
}, cacheTimeout, ThreadPool.Names.CLUSTER_COORDINATION); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems cheap enough to just run on the scheduler thread?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not guaranteed I think, a concurrent execute()
call could enqueue an expensive JoinValidation
before our cacheClearer
which we end up running:
sequenceDiagram
actor Us
participant Queue
actor Other thread
Other thread ->> Queue: Add JoinValidation
Us ->> Queue: Add cacheCleaner
Queue ->>+ Us: queueSize.cas(0,1), run tasks
Queue ->> Other thread: queueSize.cas(1, 2), stop
Us ->> Queue: JoinValidation completed, queueSize.cas(2, 1), keep going
Us ->>- Queue: cacheCleaner completed, queueSize.cas(1, 0), stop
@@ -199,6 +199,15 @@ Sets how long the master node waits for each cluster state update to be | |||
completely published to all nodes, unless `discovery.type` is set to | |||
`single-node`. The default value is `30s`. See <<cluster-state-publishing>>. | |||
|
|||
`cluster.join_validation.cache_timeout`:: | |||
(<<static-cluster-setting,Static>>) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we actually want to document this?
This seems like a really tricky implementation detail for one. Also, maybe we will evolve this functionality further and it becomes meaningless in the mid term and then we have to deal with the whole deprecation noise? :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We make it a setting just in case there's some unforeseen problem that can be fixed by changing it, and IMO it's a bit rubbish when a problem can be fixed by adjusting a setting that isn't documented. We quite rightly get asked to add docs for such settings later. Note also that this whole section of the docs does have the following disclaimer:
WARNING: If you adjust these settings then your cluster may not form correctly or may become unstable or intolerant of certain failures.
We're ok with making settings become no-ops if the implementation moves on, like e.g. we did with cluster.join.timeout
in 7.x. That's not a breaking change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! I misread a couple of spots before -> stupid questions ... but all clear now :)
Thanks both! |
We introduced a new join validation protocol in #85380 (8.3), the legacy protocol can be removed in 9.0 Remove assertion that we run a version after 8.3.0
We introduced a new join validation protocol in elastic#85380 (8.3), the legacy protocol can be removed in 9.0 Remove assertion that we run a version after 8.3.0
We introduced a new join validation protocol in elastic#85380 (8.3), the legacy protocol can be removed in 9.0 Remove assertion that we run a version after 8.3.0
Since 8.3.0 (elastic#85380) we have sent join-validation requests as a `BytesTransportRequest` to facilitate sharing these large messages (and the work needed to create them) amongst all nodes that join the cluster at around the same time. For BwC with versions earlier than 8.3.0 we use a `ValidateJoinRequest` class to represent the received data, whichever scheme it uses. We no longer need to maintain this compatibility, so we can use a bare `BytesTransportRequest` on both sender and receiver, and therefore drop the `ValidateJoinRequest` adapter and the special-cased assertion in `MockTransportService`. Relates elastic#114808 which was reverted in elastic#117200.
Since 8.3.0 (elastic#85380) we have sent join-validation requests as a `BytesTransportRequest` to facilitate sharing these large messages (and the work needed to create them) amongst all nodes that join the cluster at around the same time. For BwC with versions earlier than 8.3.0 we use a `ValidateJoinRequest` class to represent the received data, whichever scheme it uses. We no longer need to maintain this compatibility, so we can use a bare `BytesTransportRequest` on both sender and receiver, and therefore drop the `ValidateJoinRequest` adapter and the special-cased assertion in `MockTransportService`. Relates elastic#114808 which was reverted in elastic#117200.
Since 8.3.0 (#85380) we have sent join-validation requests as a `BytesTransportRequest` to facilitate sharing these large messages (and the work needed to create them) amongst all nodes that join the cluster at around the same time. For BwC with versions earlier than 8.3.0 we use a `ValidateJoinRequest` class to represent the received data, whichever scheme it uses. We no longer need to maintain this compatibility, so we can use a bare `BytesTransportRequest` on both sender and receiver, and therefore drop the `ValidateJoinRequest` adapter and the special-cased assertion in `MockTransportService`. Relates #114808 which was reverted in #117200.
Since 8.3.0 (elastic#85380) we have sent join-validation requests as a `BytesTransportRequest` to facilitate sharing these large messages (and the work needed to create them) amongst all nodes that join the cluster at around the same time. For BwC with versions earlier than 8.3.0 we use a `ValidateJoinRequest` class to represent the received data, whichever scheme it uses. We no longer need to maintain this compatibility, so we can use a bare `BytesTransportRequest` on both sender and receiver, and therefore drop the `ValidateJoinRequest` adapter and the special-cased assertion in `MockTransportService`. Relates elastic#114808 which was reverted in elastic#117200.
Fixes a few scalability issues around join validation:
Relates #77466
Closes #83204