Skip to content

Commit

Permalink
Fix wiring according to new method signatures
Browse files Browse the repository at this point in the history
  • Loading branch information
Shailendra Singh committed Aug 4, 2024
1 parent d284c32 commit b1781a0
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.core.index.Index;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

Expand Down Expand Up @@ -76,6 +77,10 @@ public void writeTo(StreamOutput out) throws IOException {
indicesRouting.writeTo(out);
}

public DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>> getIndicesRouting() {
return indicesRouting;
}

/**
* Represents a difference between {@link IndexShardRoutingTable} objects that can be serialized and deserialized.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,23 +141,24 @@ public void getAsyncIndexRoutingDiffWriteAction(
String clusterUUID,
long term,
long version,
Map<String, Diff<IndexRoutingTable>> indexRoutingTableDiff,
RoutingTable routingTableBefore,
RoutingTable routingTableAfter,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
) {
RoutingTableIncrementalDiff routingTableIncrementalDiff = null;//new RoutingTableIncrementalDiff(indexRoutingTableDiff);
RoutingTableIncrementalDiff routingTableIncrementalDiff = new RoutingTableIncrementalDiff(routingTableBefore, routingTableAfter);
RemoteRoutingTableDiff remoteRoutingTableDiff = new RemoteRoutingTableDiff(
routingTableIncrementalDiff,
clusterUUID,
compressor,
term,
version
routingTableIncrementalDiff,
clusterUUID,
compressor,
term,
version
);

ActionListener<Void> completionListener = ActionListener.wrap(
resp -> latchedActionListener.onResponse(remoteRoutingTableDiff.getUploadedMetadata()),
ex -> latchedActionListener.onFailure(
new RemoteStateTransferException("Exception in writing index routing diff to remote store", ex)
)
resp -> latchedActionListener.onResponse(remoteRoutingTableDiff.getUploadedMetadata()),
ex -> latchedActionListener.onFailure(
new RemoteStateTransferException("Exception in writing index routing diff to remote store", ex)
)
);

remoteRoutingTableDiffStore.writeAsync(remoteRoutingTableDiff, completionListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public void getAsyncIndexRoutingDiffWriteAction(
String clusterUUID,
long term,
long version,
Map<String, Diff<IndexRoutingTable>> indexRoutingTableDiff,
RoutingTable routingTableBefore,
RoutingTable routingTableAfter,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
) {
// noop
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ void getAsyncIndexRoutingDiffWriteAction(
String clusterUUID,
long term,
long version,
Map<String, Diff<IndexRoutingTable>> indexRoutingTableDiff,
RoutingTable routingTableBefore,
RoutingTable routingTableAfter,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
clusterStateCustomsDiff.getUpserts(),
updateHashesOfConsistentSettings,
indicesRoutingToUpload,
indexRoutingTableDiffs
previousClusterState.routingTable()
);

// update the map if the metadata was uploaded
Expand Down Expand Up @@ -513,14 +513,14 @@ UploadedMetadataResults writeMetadataInParallel(
Map<String, ClusterState.Custom> clusterStateCustomToUpload,
boolean uploadHashesOfConsistentSettings,
List<IndexRoutingTable> indicesRoutingToUpload,
Map<String, Diff<IndexRoutingTable>> indexRoutingTableDiff
RoutingTable previousRoutingTable
) throws IOException {
assert Objects.nonNull(indexMetadataUploadListeners) : "indexMetadataUploadListeners can not be null";
int totalUploadTasks = indexToUpload.size() + indexMetadataUploadListeners.size() + customToUpload.size()
+ (uploadCoordinationMetadata ? 1 : 0) + (uploadSettingsMetadata ? 1 : 0) + (uploadTemplateMetadata ? 1 : 0)
+ (uploadDiscoveryNodes ? 1 : 0) + (uploadClusterBlock ? 1 : 0) + (uploadTransientSettingMetadata ? 1 : 0)
+ clusterStateCustomToUpload.size() + (uploadHashesOfConsistentSettings ? 1 : 0) + indicesRoutingToUpload.size()
+ (indexRoutingTableDiff != null && !indexRoutingTableDiff.isEmpty() ? 1 : 0);
+ (previousRoutingTable != null && !previousRoutingTable.indicesRouting().isEmpty() ? 1 : 0);
CountDownLatch latch = new CountDownLatch(totalUploadTasks);
List<String> uploadTasks = Collections.synchronizedList(new ArrayList<>(totalUploadTasks));
Map<String, ClusterMetadataManifest.UploadedMetadata> results = new ConcurrentHashMap<>(totalUploadTasks);
Expand Down Expand Up @@ -690,13 +690,14 @@ UploadedMetadataResults writeMetadataInParallel(
listener
);
});
if (indexRoutingTableDiff != null && !indexRoutingTableDiff.isEmpty()) {
if (previousRoutingTable != null && !previousRoutingTable.indicesRouting().isEmpty()) {
uploadTasks.add(RemoteRoutingTableDiff.ROUTING_TABLE_DIFF_FILE);
remoteRoutingTableService.getAsyncIndexRoutingDiffWriteAction(
clusterState.metadata().clusterUUID(),
clusterState.term(),
clusterState.version(),
indexRoutingTableDiff,
previousRoutingTable,
clusterState.getRoutingTable(),
listener
);
}
Expand Down Expand Up @@ -1294,11 +1295,11 @@ ClusterState readClusterStateInParallel(
);
RoutingTableIncrementalDiff routingTableDiff = readIndexRoutingTableDiffResults.get();
if (routingTableDiff != null) {
/*routingTableDiff.getDiffs().forEach((key, diff) -> {
routingTableDiff.getIndicesRouting().getDiffs().forEach((key, diff) -> {
IndexRoutingTable previousIndexRoutingTable = indicesRouting.get(key);
IndexRoutingTable updatedTable = diff.apply(previousIndexRoutingTable);
indicesRouting.put(key, updatedTable);
});*/
});
}
clusterStateBuilder.routingTable(new RoutingTable(manifest.getRoutingTableVersion(), indicesRouting));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,7 @@ public RemoteRoutingTableDiff(String blobName, String clusterUUID, Compressor co
*/
public Map<String, Diff<IndexRoutingTable>> getDiffs() {
assert routingTableIncrementalDiff != null;
//return routingTableIncrementalDiff.getDiffs();
return null;
return routingTableIncrementalDiff.getIndicesRouting().getDiffs();
}

@Override
Expand Down

0 comments on commit b1781a0

Please sign in to comment.