diff --git a/libs/core/src/main/java/org/opensearch/core/common/io/stream/StreamOutput.java b/libs/core/src/main/java/org/opensearch/core/common/io/stream/StreamOutput.java index b7599265aece3..1d0876d79a549 100644 --- a/libs/core/src/main/java/org/opensearch/core/common/io/stream/StreamOutput.java +++ b/libs/core/src/main/java/org/opensearch/core/common/io/stream/StreamOutput.java @@ -969,9 +969,13 @@ public void writeOptionalArray(@Nullable T[] array) throws } public void writeOptionalWriteable(@Nullable Writeable writeable) throws IOException { + writeOptionalWriteable((out, writable) -> writable.writeTo(out), writeable); + } + + public void writeOptionalWriteable(final Writer writer, @Nullable T writeable) throws IOException { if (writeable != null) { writeBoolean(true); - writeable.writeTo(this); + writer.write(this, writeable); } else { writeBoolean(false); } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/allocation/ClusterAllocationExplanation.java b/server/src/main/java/org/opensearch/action/admin/cluster/allocation/ClusterAllocationExplanation.java index 3c8f07613561d..70a223d60069a 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/allocation/ClusterAllocationExplanation.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/allocation/ClusterAllocationExplanation.java @@ -95,7 +95,7 @@ public ClusterAllocationExplanation(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { shardRouting.writeTo(out); - out.writeOptionalWriteable(currentNode); + out.writeOptionalWriteable((stream, node) -> node.writeToWithAttribute(stream), currentNode); out.writeOptionalWriteable(relocationTargetNode); out.writeOptionalWriteable(clusterInfo); shardAllocationDecision.writeTo(out); diff --git a/server/src/main/java/org/opensearch/action/support/nodes/BaseNodeResponse.java b/server/src/main/java/org/opensearch/action/support/nodes/BaseNodeResponse.java index 8a4e12567b515..7e10b583ef21a 100644 --- a/server/src/main/java/org/opensearch/action/support/nodes/BaseNodeResponse.java +++ b/server/src/main/java/org/opensearch/action/support/nodes/BaseNodeResponse.java @@ -67,6 +67,6 @@ public DiscoveryNode getNode() { @Override public void writeTo(StreamOutput out) throws IOException { - node.writeTo(out); + node.writeToWithAttribute(out); } } diff --git a/server/src/main/java/org/opensearch/cluster/ClusterState.java b/server/src/main/java/org/opensearch/cluster/ClusterState.java index 9e63f961d241d..fa3979882f5cb 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterState.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterState.java @@ -781,7 +781,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(stateUUID); metadata.writeTo(out); routingTable.writeTo(out); - nodes.writeTo(out); + nodes.writeToWithAttribute(out); blocks.writeTo(out); // filter out custom states not supported by the other node int numberOfCustoms = 0; @@ -859,13 +859,23 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(toUuid); out.writeLong(toVersion); routingTable.writeTo(out); - nodes.writeTo(out); + nodesWriteToWithAttributes(nodes, out); metadata.writeTo(out); blocks.writeTo(out); customs.writeTo(out); out.writeVInt(minimumClusterManagerNodesOnPublishingClusterManager); } + private void nodesWriteToWithAttributes(Diff nodes, StreamOutput out) throws IOException { + DiscoveryNodes part = nodes.apply(null); + if (part != null) { + out.writeBoolean(true); + part.writeToWithAttribute(out); + } else { + out.writeBoolean(false); + } + } + @Override public ClusterState apply(ClusterState state) { Builder builder = new Builder(clusterName); diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Join.java b/server/src/main/java/org/opensearch/cluster/coordination/Join.java index 58fa85992ebc8..ce1a234998690 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Join.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Join.java @@ -78,8 +78,8 @@ public Join(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { - sourceNode.writeTo(out); - targetNode.writeTo(out); + sourceNode.writeToWithAttribute(out); + targetNode.writeToWithAttribute(out); out.writeLong(term); out.writeLong(lastAcceptedTerm); out.writeLong(lastAcceptedVersion); diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinRequest.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinRequest.java index 6eb1514ae848c..c598240e2e9f0 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinRequest.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinRequest.java @@ -89,7 +89,7 @@ public JoinRequest(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - sourceNode.writeTo(out); + sourceNode.writeToWithAttribute(out); if (out.getVersion().onOrAfter(LegacyESVersion.V_7_7_0)) { out.writeLong(minimumTerm); } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/StartJoinRequest.java b/server/src/main/java/org/opensearch/cluster/coordination/StartJoinRequest.java index de58eb721b28f..287418aaf378e 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/StartJoinRequest.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/StartJoinRequest.java @@ -64,7 +64,7 @@ public StartJoinRequest(StreamInput input) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - sourceNode.writeTo(out); + sourceNode.writeToWithAttribute(out); out.writeLong(term); } diff --git a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java index 81f699b3b8069..1694fc23f511e 100644 --- a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java +++ b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java @@ -385,17 +385,34 @@ public DiscoveryNode(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { + if (out.getVersion().onOrAfter(Version.V_2_17_0)) { + writeToUtil(out, false); + } else { + writeToUtil(out, true); + } + } + + public void writeToWithAttribute(StreamOutput out) throws IOException { + writeToUtil(out, true); + } + + public void writeToUtil(StreamOutput out, boolean includeAllAttributes) throws IOException { out.writeString(nodeName); out.writeString(nodeId); out.writeString(ephemeralId); out.writeString(hostName); out.writeString(hostAddress); address.writeTo(out); - out.writeVInt(attributes.size()); - for (Map.Entry entry : attributes.entrySet()) { - out.writeString(entry.getKey()); - out.writeString(entry.getValue()); + if (includeAllAttributes) { + out.writeVInt(attributes.size()); + for (Map.Entry entry : attributes.entrySet()) { + out.writeString(entry.getKey()); + out.writeString(entry.getValue()); + } + } else { + out.writeVInt(0); } + if (out.getVersion().onOrAfter(LegacyESVersion.V_7_3_0)) { out.writeVInt(roles.size()); for (final DiscoveryNodeRole role : roles) { diff --git a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNodes.java b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNodes.java index 203d45c3318c3..33605ec6330cf 100644 --- a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNodes.java +++ b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNodes.java @@ -701,6 +701,14 @@ public String shortSummary() { @Override public void writeTo(StreamOutput out) throws IOException { + writeToUtil((output, value) -> value.writeTo(output), out); + } + + public void writeToWithAttribute(StreamOutput out) throws IOException { + writeToUtil((output, value) -> value.writeToWithAttribute(output), out); + } + + private void writeToUtil(final Writer writer, StreamOutput out) throws IOException { if (clusterManagerNodeId == null) { out.writeBoolean(false); } else { @@ -709,7 +717,7 @@ public void writeTo(StreamOutput out) throws IOException { } out.writeVInt(nodes.size()); for (DiscoveryNode node : this) { - node.writeTo(out); + writer.write(out, node); } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AbstractAllocationDecision.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AbstractAllocationDecision.java index 59a39b358cb70..614e9f49c8726 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AbstractAllocationDecision.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AbstractAllocationDecision.java @@ -107,7 +107,7 @@ public List getNodeDecisions() { @Override public void writeTo(StreamOutput out) throws IOException { - out.writeOptionalWriteable(targetNode); + out.writeOptionalWriteable((stream, node) -> node.writeToWithAttribute(stream), targetNode); if (nodeDecisions != null) { out.writeBoolean(true); out.writeList(nodeDecisions); diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/NodeAllocationResult.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/NodeAllocationResult.java index 4163a5fd4c16f..6b805ca91fa58 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/NodeAllocationResult.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/NodeAllocationResult.java @@ -104,7 +104,7 @@ public NodeAllocationResult(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { - node.writeTo(out); + node.writeToWithAttribute(out); out.writeOptionalWriteable(shardStoreInfo); out.writeOptionalWriteable(canAllocateDecision); nodeDecision.writeTo(out); diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodes.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodes.java index 446207a767009..829036c6d122b 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodes.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodes.java @@ -88,7 +88,12 @@ public UploadedMetadata getUploadedMetadata() { @Override public InputStream serialize() throws IOException { - return DISCOVERY_NODES_FORMAT.serialize(discoveryNodes, generateBlobFileName(), getCompressor()).streamInput(); + return DISCOVERY_NODES_FORMAT.serialize( + (out, discoveryNode) -> discoveryNode.writeToWithAttribute(out), + discoveryNodes, + generateBlobFileName(), + getCompressor() + ).streamInput(); } @Override diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index 139e7b51d2200..bd600e1204fa8 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -295,6 +295,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi logger.debug("{} reestablishing recovery from {}", startRequest.shardId(), startRequest.sourceNode()); } } + transportService.sendRequest( startRequest.sourceNode(), actionName, diff --git a/server/src/main/java/org/opensearch/indices/recovery/StartRecoveryRequest.java b/server/src/main/java/org/opensearch/indices/recovery/StartRecoveryRequest.java index 1df0d3861f686..7309712314acd 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/StartRecoveryRequest.java +++ b/server/src/main/java/org/opensearch/indices/recovery/StartRecoveryRequest.java @@ -144,8 +144,8 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(recoveryId); shardId.writeTo(out); out.writeString(targetAllocationId); - sourceNode.writeTo(out); - targetNode.writeTo(out); + sourceNode.writeToWithAttribute(out); + targetNode.writeToWithAttribute(out); metadataSnapshot.writeTo(out); out.writeBoolean(primaryRelocation); out.writeLong(startingSeqNo); diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormat.java b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormat.java index 0add86ab88a16..88672995f4fd6 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormat.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormat.java @@ -28,6 +28,7 @@ import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.common.io.stream.Writeable.Writer; import org.opensearch.core.compress.Compressor; import org.opensearch.core.compress.CompressorRegistry; import org.opensearch.gateway.CorruptStateException; @@ -56,6 +57,10 @@ public ChecksumWritableBlobStoreFormat(String codec, CheckedFunction unSerializedObj.writeTo(out), obj, blobName, compressor); + } + + public BytesReference serialize(final Writer writer, T obj, final String blobName, final Compressor compressor) throws IOException { try (BytesStreamOutput outputStream = new BytesStreamOutput()) { try ( OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput( @@ -76,7 +81,7 @@ public void close() throws IOException { }; StreamOutput stream = new OutputStreamStreamOutput(compressor.threadLocalOutputStream(indexOutputOutputStream));) { // TODO The stream version should be configurable stream.setVersion(Version.CURRENT); - obj.writeTo(stream); + writer.write(stream, obj); } CodecUtil.writeFooter(indexOutput); } diff --git a/server/src/main/java/org/opensearch/transport/TransportService.java b/server/src/main/java/org/opensearch/transport/TransportService.java index 59130ec637a52..8792f56ea8da0 100644 --- a/server/src/main/java/org/opensearch/transport/TransportService.java +++ b/server/src/main/java/org/opensearch/transport/TransportService.java @@ -759,7 +759,7 @@ public HandshakeResponse(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { - out.writeOptionalWriteable(discoveryNode); + out.writeOptionalWriteable((stream, node) -> node.writeToWithAttribute(stream), discoveryNode); clusterName.writeTo(out); if (out.getVersion().before(Version.V_1_0_0)) { out.writeVersion(LegacyESVersion.V_7_10_2); diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java index 4ef459e6657a1..67b1528466a9e 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java @@ -140,7 +140,12 @@ public void testGetAsyncReadRunnable_DiscoveryNodes() throws IOException, Interr DiscoveryNodes discoveryNodes = getDiscoveryNodes(); String fileName = randomAlphaOfLength(10); when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( - DISCOVERY_NODES_FORMAT.serialize(discoveryNodes, fileName, compressor).streamInput() + DISCOVERY_NODES_FORMAT.serialize( + (out, discoveryNode) -> discoveryNode.writeToWithAttribute(out), + discoveryNodes, + fileName, + compressor + ).streamInput() ); RemoteDiscoveryNodes remoteObjForDownload = new RemoteDiscoveryNodes(fileName, "cluster-uuid", compressor); CountDownLatch latch = new CountDownLatch(1); diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 835087632d0d0..63a9c1eaba977 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -1186,7 +1186,12 @@ public void testGetClusterStateUsingDiff() throws IOException { diffManifestBuilder.discoveryNodesUpdated(true); manifestBuilder.discoveryNodesMetadata(new UploadedMetadataAttribute(DISCOVERY_NODES, DISCOVERY_NODES_FILENAME)); when(blobContainer.readBlob(DISCOVERY_NODES_FILENAME)).thenAnswer(invocationOnMock -> { - BytesReference bytes = DISCOVERY_NODES_FORMAT.serialize(nodesBuilder.build(), DISCOVERY_NODES_FILENAME, compressor); + BytesReference bytes = DISCOVERY_NODES_FORMAT.serialize( + (out, nodes) -> nodes.writeToWithAttribute(out), + nodesBuilder.build(), + DISCOVERY_NODES_FILENAME, + compressor + ); return new ByteArrayInputStream(bytes.streamInput().readAllBytes()); }); } diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodesTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodesTests.java index f1bced2bdf855..1b988ee1f37ec 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodesTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodesTests.java @@ -143,7 +143,7 @@ public void testSerDe() throws IOException { public void testExceptionDuringSerialization() throws IOException { DiscoveryNodes nodes = mock(DiscoveryNodes.class); RemoteDiscoveryNodes remoteObjectForUpload = new RemoteDiscoveryNodes(nodes, METADATA_VERSION, clusterUUID, compressor); - doThrow(new IOException("mock-exception")).when(nodes).writeTo(any()); + doThrow(new IOException("mock-exception")).when(nodes).writeToWithAttribute(any()); IOException iea = assertThrows(IOException.class, remoteObjectForUpload::serialize); } diff --git a/server/src/test/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormatTests.java b/server/src/test/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormatTests.java index 536df880b2597..c4e53c1eea138 100644 --- a/server/src/test/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormatTests.java +++ b/server/src/test/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormatTests.java @@ -35,7 +35,12 @@ public class ChecksumWritableBlobStoreFormatTests extends OpenSearchTestCase { public void testSerDe() throws IOException { IndexMetadata indexMetadata = getIndexMetadata(); - BytesReference bytesReference = clusterBlocksFormat.serialize(indexMetadata, TEST_BLOB_FILE_NAME, CompressorRegistry.none()); + BytesReference bytesReference = clusterBlocksFormat.serialize( + (out, metadata) -> metadata.writeTo(out), + indexMetadata, + TEST_BLOB_FILE_NAME, + CompressorRegistry.none() + ); IndexMetadata readIndexMetadata = clusterBlocksFormat.deserialize(TEST_BLOB_FILE_NAME, bytesReference); assertThat(readIndexMetadata, is(indexMetadata)); } @@ -43,6 +48,7 @@ public void testSerDe() throws IOException { public void testSerDeForCompressed() throws IOException { IndexMetadata indexMetadata = getIndexMetadata(); BytesReference bytesReference = clusterBlocksFormat.serialize( + (out, metadata) -> metadata.writeTo(out), indexMetadata, TEST_BLOB_FILE_NAME, CompressorRegistry.getCompressor(DeflateCompressor.NAME)