Skip to content

Commit

Permalink
Moving get snapshot requests to listener based async calls
Browse files Browse the repository at this point in the history
Signed-off-by: INDRAJIT BANERJEE <indrajohn7@gmail.com>
  • Loading branch information
indrajohn7 committed Jun 29, 2023
1 parent 544b1ca commit c98845a
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 41 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Change http code for DecommissioningFailedException from 500 to 400 ([#5283](https://github.com/opensearch-project/OpenSearch/pull/5283))
- Improve summary error message for invalid setting updates ([#4792](https://github.com/opensearch-project/OpenSearch/pull/4792))
- Pass localNode info to all plugins on node start ([#7919](https://github.com/opensearch-project/OpenSearch/pull/7919))
- Moving get snapshot requests to listener based async calls ([#8216](https://github.com/opensearch-project/OpenSearch/pull/8216))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.CollectionUtil;
import org.opensearch.action.ActionListener;
import org.opensearch.action.StepListener;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction;
Expand Down Expand Up @@ -138,57 +139,60 @@ protected void clusterManagerOperation(
currentSnapshots.add(snapshotInfo);
}

final RepositoryData repositoryData;
if (isCurrentSnapshotsOnly(request.snapshots()) == false) {
repositoryData = PlainActionFuture.get(fut -> repositoriesService.getRepositoryData(repository, fut));
for (SnapshotId snapshotId : repositoryData.getSnapshotIds()) {
allSnapshotIds.put(snapshotId.getName(), snapshotId);
final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
repositoriesService.getRepositoryData(repository, repositoryDataListener);
repositoryDataListener.whenComplete(repositoryData -> {
if (isCurrentSnapshotsOnly(request.snapshots()) == false) {
repositoryData = PlainActionFuture.get(fut -> repositoriesService.getRepositoryData(repository, fut));
for (SnapshotId snapshotId : repositoryData.getSnapshotIds()) {
allSnapshotIds.put(snapshotId.getName(), snapshotId);
}
} else {
repositoryData = null;
}
} else {
repositoryData = null;
}

final Set<SnapshotId> toResolve = new HashSet<>();
if (isAllSnapshots(request.snapshots())) {
toResolve.addAll(allSnapshotIds.values());
} else {
for (String snapshotOrPattern : request.snapshots()) {
if (GetSnapshotsRequest.CURRENT_SNAPSHOT.equalsIgnoreCase(snapshotOrPattern)) {
toResolve.addAll(currentSnapshots.stream().map(SnapshotInfo::snapshotId).collect(Collectors.toList()));
} else if (Regex.isSimpleMatchPattern(snapshotOrPattern) == false) {
if (allSnapshotIds.containsKey(snapshotOrPattern)) {
toResolve.add(allSnapshotIds.get(snapshotOrPattern));
} else if (request.ignoreUnavailable() == false) {
throw new SnapshotMissingException(repository, snapshotOrPattern);
}
} else {
for (Map.Entry<String, SnapshotId> entry : allSnapshotIds.entrySet()) {
if (Regex.simpleMatch(snapshotOrPattern, entry.getKey())) {
toResolve.add(entry.getValue());
final Set<SnapshotId> toResolve = new HashSet<>();
if (isAllSnapshots(request.snapshots())) {
toResolve.addAll(allSnapshotIds.values());
} else {
for (String snapshotOrPattern : request.snapshots()) {
if (GetSnapshotsRequest.CURRENT_SNAPSHOT.equalsIgnoreCase(snapshotOrPattern)) {
toResolve.addAll(currentSnapshots.stream().map(SnapshotInfo::snapshotId).collect(Collectors.toList()));
} else if (Regex.isSimpleMatchPattern(snapshotOrPattern) == false) {
if (allSnapshotIds.containsKey(snapshotOrPattern)) {
toResolve.add(allSnapshotIds.get(snapshotOrPattern));
} else if (request.ignoreUnavailable() == false) {
throw new SnapshotMissingException(repository, snapshotOrPattern);
}
} else {
for (Map.Entry<String, SnapshotId> entry : allSnapshotIds.entrySet()) {
if (Regex.simpleMatch(snapshotOrPattern, entry.getKey())) {
toResolve.add(entry.getValue());
}
}
}
}
}

if (toResolve.isEmpty() && request.ignoreUnavailable() == false && isCurrentSnapshotsOnly(request.snapshots()) == false) {
throw new SnapshotMissingException(repository, request.snapshots()[0]);
if (toResolve.isEmpty() && request.ignoreUnavailable() == false && isCurrentSnapshotsOnly(request.snapshots()) == false) {
throw new SnapshotMissingException(repository, request.snapshots()[0]);
}
}
}

final List<SnapshotInfo> snapshotInfos;
if (request.verbose()) {
snapshotInfos = snapshots(snapshotsInProgress, repository, new ArrayList<>(toResolve), request.ignoreUnavailable());
} else {
if (repositoryData != null) {
// want non-current snapshots as well, which are found in the repository data
snapshotInfos = buildSimpleSnapshotInfos(toResolve, repositoryData, currentSnapshots);
final List<SnapshotInfo> snapshotInfos;
if (request.verbose()) {
snapshotInfos = snapshots(snapshotsInProgress, repository, new ArrayList<>(toResolve), request.ignoreUnavailable());
} else {
// only want current snapshots
snapshotInfos = currentSnapshots.stream().map(SnapshotInfo::basic).collect(Collectors.toList());
CollectionUtil.timSort(snapshotInfos);
if (repositoryData != null) {
// want non-current snapshots as well, which are found in the repository data
snapshotInfos = buildSimpleSnapshotInfos(toResolve, repositoryData, currentSnapshots);
} else {
// only want current snapshots
snapshotInfos = currentSnapshots.stream().map(SnapshotInfo::basic).collect(Collectors.toList());
CollectionUtil.timSort(snapshotInfos);
}
}
}
listener.onResponse(new GetSnapshotsResponse(snapshotInfos));
listener.onResponse(new GetSnapshotsResponse(snapshotInfos));
}, listener::onFailure);
} catch (Exception e) {
listener.onFailure(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
import org.opensearch.action.admin.cluster.snapshots.delete.DeleteSnapshotAction;
import org.opensearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
import org.opensearch.action.admin.cluster.snapshots.delete.TransportDeleteSnapshotAction;
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsAction;
import org.opensearch.action.admin.cluster.snapshots.get.TransportGetSnapshotsAction;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotAction;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
Expand Down Expand Up @@ -2136,6 +2138,17 @@ public void onFailure(final Exception e) {
indexNameExpressionResolver
)
);
actions.put(
GetSnapshotsAction.INSTANCE,
new TransportGetSnapshotsAction(
transportService,
clusterService,
threadPool,
repositoriesService,
actionFilters,
indexNameExpressionResolver
)
);
actions.put(
ClusterStateAction.INSTANCE,
new TransportClusterStateAction(
Expand Down

0 comments on commit c98845a

Please sign in to comment.