Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimising node to node communication by serializing node attribute in DiscoveryNode only in scenarioes where it is required #15617

Merged
merged 1 commit into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -969,9 +969,13 @@ public <T extends Writeable> void writeOptionalArray(@Nullable T[] array) throws
}

public void writeOptionalWriteable(@Nullable Writeable writeable) throws IOException {
writeOptionalWriteable((out, writable) -> writable.writeTo(out), writeable);
}

public <T extends Writeable> void writeOptionalWriteable(final Writer<T> writer, @Nullable T writeable) throws IOException {
if (writeable != null) {
writeBoolean(true);
writeable.writeTo(this);
writer.write(this, writeable);
} else {
writeBoolean(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public ClusterAllocationExplanation(StreamInput in) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
shardRouting.writeTo(out);
out.writeOptionalWriteable(currentNode);
out.writeOptionalWriteable((stream, node) -> node.writeToWithAttribute(stream), currentNode);
out.writeOptionalWriteable(relocationTargetNode);
out.writeOptionalWriteable(clusterInfo);
shardAllocationDecision.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,6 @@ public DiscoveryNode getNode() {

@Override
public void writeTo(StreamOutput out) throws IOException {
node.writeTo(out);
node.writeToWithAttribute(out);
}
}
14 changes: 12 additions & 2 deletions server/src/main/java/org/opensearch/cluster/ClusterState.java
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(stateUUID);
metadata.writeTo(out);
routingTable.writeTo(out);
nodes.writeTo(out);
nodes.writeToWithAttribute(out);
blocks.writeTo(out);
// filter out custom states not supported by the other node
int numberOfCustoms = 0;
Expand Down Expand Up @@ -859,13 +859,23 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(toUuid);
out.writeLong(toVersion);
routingTable.writeTo(out);
nodes.writeTo(out);
nodesWriteToWithAttributes(nodes, out);
metadata.writeTo(out);
blocks.writeTo(out);
customs.writeTo(out);
out.writeVInt(minimumClusterManagerNodesOnPublishingClusterManager);
}

private void nodesWriteToWithAttributes(Diff<DiscoveryNodes> nodes, StreamOutput out) throws IOException {
DiscoveryNodes part = nodes.apply(null);
if (part != null) {
out.writeBoolean(true);
part.writeToWithAttribute(out);
} else {
out.writeBoolean(false);
}
}

@Override
public ClusterState apply(ClusterState state) {
Builder builder = new Builder(clusterName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ public Join(StreamInput in) throws IOException {

@Override
public void writeTo(StreamOutput out) throws IOException {
sourceNode.writeTo(out);
targetNode.writeTo(out);
sourceNode.writeToWithAttribute(out);
targetNode.writeToWithAttribute(out);
out.writeLong(term);
out.writeLong(lastAcceptedTerm);
out.writeLong(lastAcceptedVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public JoinRequest(StreamInput in) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
sourceNode.writeTo(out);
sourceNode.writeToWithAttribute(out);
if (out.getVersion().onOrAfter(LegacyESVersion.V_7_7_0)) {
out.writeLong(minimumTerm);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public StartJoinRequest(StreamInput input) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
sourceNode.writeTo(out);
sourceNode.writeToWithAttribute(out);
out.writeLong(term);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,17 +385,34 @@ public DiscoveryNode(StreamInput in) throws IOException {

@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_17_0)) {
writeToUtil(out, false);
} else {
writeToUtil(out, true);
}
}

public void writeToWithAttribute(StreamOutput out) throws IOException {
writeToUtil(out, true);
}

public void writeToUtil(StreamOutput out, boolean includeAllAttributes) throws IOException {
out.writeString(nodeName);
out.writeString(nodeId);
out.writeString(ephemeralId);
out.writeString(hostName);
out.writeString(hostAddress);
address.writeTo(out);
out.writeVInt(attributes.size());
for (Map.Entry<String, String> entry : attributes.entrySet()) {
out.writeString(entry.getKey());
out.writeString(entry.getValue());
if (includeAllAttributes) {
out.writeVInt(attributes.size());
for (Map.Entry<String, String> entry : attributes.entrySet()) {
out.writeString(entry.getKey());
out.writeString(entry.getValue());
}
} else {
out.writeVInt(0);
}

if (out.getVersion().onOrAfter(LegacyESVersion.V_7_3_0)) {
out.writeVInt(roles.size());
for (final DiscoveryNodeRole role : roles) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,14 @@

@Override
public void writeTo(StreamOutput out) throws IOException {
writeToUtil((output, value) -> value.writeTo(output), out);
}

Check warning on line 705 in server/src/main/java/org/opensearch/cluster/node/DiscoveryNodes.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/node/DiscoveryNodes.java#L704-L705

Added lines #L704 - L705 were not covered by tests

public void writeToWithAttribute(StreamOutput out) throws IOException {
writeToUtil((output, value) -> value.writeToWithAttribute(output), out);
}

private void writeToUtil(final Writer<DiscoveryNode> writer, StreamOutput out) throws IOException {
if (clusterManagerNodeId == null) {
out.writeBoolean(false);
} else {
Expand All @@ -709,7 +717,7 @@
}
out.writeVInt(nodes.size());
for (DiscoveryNode node : this) {
node.writeTo(out);
writer.write(out, node);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public List<NodeAllocationResult> getNodeDecisions() {

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(targetNode);
out.writeOptionalWriteable((stream, node) -> node.writeToWithAttribute(stream), targetNode);
if (nodeDecisions != null) {
out.writeBoolean(true);
out.writeList(nodeDecisions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public NodeAllocationResult(StreamInput in) throws IOException {

@Override
public void writeTo(StreamOutput out) throws IOException {
node.writeTo(out);
node.writeToWithAttribute(out);
out.writeOptionalWriteable(shardStoreInfo);
out.writeOptionalWriteable(canAllocateDecision);
nodeDecision.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,12 @@ public UploadedMetadata getUploadedMetadata() {

@Override
public InputStream serialize() throws IOException {
return DISCOVERY_NODES_FORMAT.serialize(discoveryNodes, generateBlobFileName(), getCompressor()).streamInput();
return DISCOVERY_NODES_FORMAT.serialize(
(out, discoveryNode) -> discoveryNode.writeToWithAttribute(out),
discoveryNodes,
generateBlobFileName(),
getCompressor()
).streamInput();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
logger.debug("{} reestablishing recovery from {}", startRequest.shardId(), startRequest.sourceNode());
}
}

transportService.sendRequest(
startRequest.sourceNode(),
actionName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(recoveryId);
shardId.writeTo(out);
out.writeString(targetAllocationId);
sourceNode.writeTo(out);
targetNode.writeTo(out);
sourceNode.writeToWithAttribute(out);
targetNode.writeToWithAttribute(out);
metadataSnapshot.writeTo(out);
out.writeBoolean(primaryRelocation);
out.writeLong(startingSeqNo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.common.io.stream.Writeable.Writer;
import org.opensearch.core.compress.Compressor;
import org.opensearch.core.compress.CompressorRegistry;
import org.opensearch.gateway.CorruptStateException;
Expand Down Expand Up @@ -56,6 +57,10 @@ public ChecksumWritableBlobStoreFormat(String codec, CheckedFunction<StreamInput
}

public BytesReference serialize(final T obj, final String blobName, final Compressor compressor) throws IOException {
return serialize((out, unSerializedObj) -> unSerializedObj.writeTo(out), obj, blobName, compressor);
}

public BytesReference serialize(final Writer<T> writer, T obj, final String blobName, final Compressor compressor) throws IOException {
try (BytesStreamOutput outputStream = new BytesStreamOutput()) {
try (
OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(
Expand All @@ -76,7 +81,7 @@ public void close() throws IOException {
}; StreamOutput stream = new OutputStreamStreamOutput(compressor.threadLocalOutputStream(indexOutputOutputStream));) {
// TODO The stream version should be configurable
stream.setVersion(Version.CURRENT);
obj.writeTo(stream);
writer.write(stream, obj);
}
CodecUtil.writeFooter(indexOutput);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,7 @@ public HandshakeResponse(StreamInput in) throws IOException {

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(discoveryNode);
out.writeOptionalWriteable((stream, node) -> node.writeToWithAttribute(stream), discoveryNode);
clusterName.writeTo(out);
if (out.getVersion().before(Version.V_1_0_0)) {
out.writeVersion(LegacyESVersion.V_7_10_2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,12 @@ public void testGetAsyncReadRunnable_DiscoveryNodes() throws IOException, Interr
DiscoveryNodes discoveryNodes = getDiscoveryNodes();
String fileName = randomAlphaOfLength(10);
when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn(
DISCOVERY_NODES_FORMAT.serialize(discoveryNodes, fileName, compressor).streamInput()
DISCOVERY_NODES_FORMAT.serialize(
(out, discoveryNode) -> discoveryNode.writeToWithAttribute(out),
discoveryNodes,
fileName,
compressor
).streamInput()
);
RemoteDiscoveryNodes remoteObjForDownload = new RemoteDiscoveryNodes(fileName, "cluster-uuid", compressor);
CountDownLatch latch = new CountDownLatch(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1186,7 +1186,12 @@ public void testGetClusterStateUsingDiff() throws IOException {
diffManifestBuilder.discoveryNodesUpdated(true);
manifestBuilder.discoveryNodesMetadata(new UploadedMetadataAttribute(DISCOVERY_NODES, DISCOVERY_NODES_FILENAME));
when(blobContainer.readBlob(DISCOVERY_NODES_FILENAME)).thenAnswer(invocationOnMock -> {
BytesReference bytes = DISCOVERY_NODES_FORMAT.serialize(nodesBuilder.build(), DISCOVERY_NODES_FILENAME, compressor);
BytesReference bytes = DISCOVERY_NODES_FORMAT.serialize(
(out, nodes) -> nodes.writeToWithAttribute(out),
nodesBuilder.build(),
DISCOVERY_NODES_FILENAME,
compressor
);
return new ByteArrayInputStream(bytes.streamInput().readAllBytes());
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public void testSerDe() throws IOException {
public void testExceptionDuringSerialization() throws IOException {
DiscoveryNodes nodes = mock(DiscoveryNodes.class);
RemoteDiscoveryNodes remoteObjectForUpload = new RemoteDiscoveryNodes(nodes, METADATA_VERSION, clusterUUID, compressor);
doThrow(new IOException("mock-exception")).when(nodes).writeTo(any());
doThrow(new IOException("mock-exception")).when(nodes).writeToWithAttribute(any());
IOException iea = assertThrows(IOException.class, remoteObjectForUpload::serialize);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,20 @@ public class ChecksumWritableBlobStoreFormatTests extends OpenSearchTestCase {

public void testSerDe() throws IOException {
IndexMetadata indexMetadata = getIndexMetadata();
BytesReference bytesReference = clusterBlocksFormat.serialize(indexMetadata, TEST_BLOB_FILE_NAME, CompressorRegistry.none());
BytesReference bytesReference = clusterBlocksFormat.serialize(
(out, metadata) -> metadata.writeTo(out),
indexMetadata,
TEST_BLOB_FILE_NAME,
CompressorRegistry.none()
);
IndexMetadata readIndexMetadata = clusterBlocksFormat.deserialize(TEST_BLOB_FILE_NAME, bytesReference);
assertThat(readIndexMetadata, is(indexMetadata));
}

public void testSerDeForCompressed() throws IOException {
IndexMetadata indexMetadata = getIndexMetadata();
BytesReference bytesReference = clusterBlocksFormat.serialize(
(out, metadata) -> metadata.writeTo(out),
indexMetadata,
TEST_BLOB_FILE_NAME,
CompressorRegistry.getCompressor(DeflateCompressor.NAME)
Expand Down
Loading