Skip to content

Commit

Permalink
Delete stale index routing table files resolves opensearch-project#14162
Browse files Browse the repository at this point in the history
 (opensearch-project#13909). (opensearch-project#14195)

Signed-off-by: Shailendra Singh <singhlhs@amazon.com>
Co-authored-by: Shailendra Singh <singhlhs@amazon.com>
Signed-off-by: kkewwei <kkewwei@163.com>
  • Loading branch information
2 people authored and kkewwei committed Jul 24, 2024
1 parent 54a330c commit dcd0c93
Show file tree
Hide file tree
Showing 8 changed files with 293 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.IndexInput;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.ClusterState;
Expand Down Expand Up @@ -297,4 +298,15 @@ protected void doStart() {
@Override
protected void doStop() {}

@Override
public void deleteStaleIndexRoutingPaths(List<String> stalePaths) throws IOException {
try {
logger.debug(() -> "Deleting stale index routing files from remote - " + stalePaths);
blobStoreRepository.blobStore().blobContainer(BlobPath.cleanPath()).deleteBlobsIgnoringIfNotExists(stalePaths);
} catch (IOException e) {
logger.error(() -> new ParameterizedMessage("Failed to delete some stale index routing paths from {}", stalePaths), e);
throw e;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,9 @@ protected void doStop() {
protected void doClose() throws IOException {
// noop
}

@Override
public void deleteStaleIndexRoutingPaths(List<String> stalePaths) throws IOException {
// noop
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,6 @@ List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndicesRouting
List<String> indicesRoutingToDelete
);

public void deleteStaleIndexRoutingPaths(List<String> stalePaths) throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Strings;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.routing.remote.RemoteRoutingTableService;
import org.opensearch.cluster.service.ClusterApplierService;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.blobstore.BlobMetadata;
Expand Down Expand Up @@ -72,8 +73,13 @@ public class RemoteClusterStateCleanupManager implements Closeable {
private final ThreadPool threadpool;
private final ClusterApplierService clusterApplierService;
private RemoteManifestManager remoteManifestManager;
private final RemoteRoutingTableService remoteRoutingTableService;

public RemoteClusterStateCleanupManager(RemoteClusterStateService remoteClusterStateService, ClusterService clusterService) {
public RemoteClusterStateCleanupManager(
RemoteClusterStateService remoteClusterStateService,
ClusterService clusterService,
RemoteRoutingTableService remoteRoutingTableService
) {
this.remoteClusterStateService = remoteClusterStateService;
this.remoteStateStats = remoteClusterStateService.getStats();
ClusterSettings clusterSettings = clusterService.getClusterSettings();
Expand All @@ -83,6 +89,7 @@ public RemoteClusterStateCleanupManager(RemoteClusterStateService remoteClusterS
// initialize with 0, a cleanup will be done when this node is elected master node and version is incremented more than threshold
this.lastCleanupAttemptStateVersion = 0;
clusterSettings.addSettingsUpdateConsumer(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING, this::updateCleanupInterval);
this.remoteRoutingTableService = remoteRoutingTableService;
}

void start() {
Expand Down Expand Up @@ -170,6 +177,7 @@ void deleteClusterMetadata(
Set<String> staleManifestPaths = new HashSet<>();
Set<String> staleIndexMetadataPaths = new HashSet<>();
Set<String> staleGlobalMetadataPaths = new HashSet<>();
Set<String> staleIndexRoutingPaths = new HashSet<>();
activeManifestBlobMetadata.forEach(blobMetadata -> {
ClusterMetadataManifest clusterMetadataManifest = remoteManifestManager.fetchRemoteClusterMetadataManifest(
clusterName,
Expand All @@ -192,6 +200,10 @@ void deleteClusterMetadata(
.values()
.forEach(attribute -> filesToKeep.add(attribute.getUploadedFilename()));
}
if (clusterMetadataManifest.getIndicesRouting() != null) {
clusterMetadataManifest.getIndicesRouting()
.forEach(uploadedIndicesRouting -> filesToKeep.add(uploadedIndicesRouting.getUploadedFilename()));
}
});
staleManifestBlobMetadata.forEach(blobMetadata -> {
ClusterMetadataManifest clusterMetadataManifest = remoteManifestManager.fetchRemoteClusterMetadataManifest(
Expand Down Expand Up @@ -221,6 +233,19 @@ void deleteClusterMetadata(
.filter(file -> filesToKeep.contains(file) == false)
.forEach(staleGlobalMetadataPaths::add);
}
if (clusterMetadataManifest.getIndicesRouting() != null) {
clusterMetadataManifest.getIndicesRouting().forEach(uploadedIndicesRouting -> {
if (!filesToKeep.contains(uploadedIndicesRouting.getUploadedFilename())) {
staleIndexRoutingPaths.add(uploadedIndicesRouting.getUploadedFilename());
logger.debug(
() -> new ParameterizedMessage(
"Indices routing paths in stale manifest: {}",
uploadedIndicesRouting.getUploadedFilename()
)
);
}
});
}

clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> {
String fileName = RemoteClusterStateUtils.getFormattedIndexFileName(uploadedIndexMetadata.getUploadedFilename());
Expand All @@ -238,6 +263,15 @@ void deleteClusterMetadata(
deleteStalePaths(new ArrayList<>(staleGlobalMetadataPaths));
deleteStalePaths(new ArrayList<>(staleIndexMetadataPaths));
deleteStalePaths(new ArrayList<>(staleManifestPaths));
try {
remoteRoutingTableService.deleteStaleIndexRoutingPaths(new ArrayList<>(staleIndexRoutingPaths));
} catch (IOException e) {
logger.error(
() -> new ParameterizedMessage("Error while deleting stale index routing files {}", staleIndexRoutingPaths),
e
);
remoteStateStats.indexRoutingFilesCleanupAttemptFailed();
}
} catch (IllegalStateException e) {
logger.error("Error while fetching Remote Cluster Metadata manifests", e);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,9 @@ public RemoteClusterStateService(
this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD);
clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold);
this.remoteStateStats = new RemotePersistenceStats();
this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService);
this.indexMetadataUploadListeners = indexMetadataUploadListeners;
this.remoteRoutingTableService = RemoteRoutingTableServiceFactory.getService(repositoriesService, settings, clusterSettings);
this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService, remoteRoutingTableService);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@
*/
public class RemotePersistenceStats extends PersistedStateStats {
static final String CLEANUP_ATTEMPT_FAILED_COUNT = "cleanup_attempt_failed_count";
static final String INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT = "index_routing_files_cleanup_attempt_failed_count";
static final String REMOTE_UPLOAD = "remote_upload";
private AtomicLong cleanupAttemptFailedCount = new AtomicLong(0);

private AtomicLong indexRoutingFilesCleanupAttemptFailedCount = new AtomicLong(0);

public RemotePersistenceStats() {
super(REMOTE_UPLOAD);
addToExtendedFields(CLEANUP_ATTEMPT_FAILED_COUNT, cleanupAttemptFailedCount);
addToExtendedFields(INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT, indexRoutingFilesCleanupAttemptFailedCount);
}

public void cleanUpAttemptFailed() {
Expand All @@ -34,4 +38,12 @@ public void cleanUpAttemptFailed() {
public long getCleanupAttemptFailedCount() {
return cleanupAttemptFailedCount.get();
}

public void indexRoutingFilesCleanupAttemptFailed() {
indexRoutingFilesCleanupAttemptFailedCount.incrementAndGet();
}

public long getIndexRoutingFilesCleanupAttemptFailedCount() {
return indexRoutingFilesCleanupAttemptFailedCount.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@
import org.junit.Before;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

import static org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService.INDEX_ROUTING_FILE_PREFIX;
import static org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService.INDEX_ROUTING_PATH_TOKEN;
Expand All @@ -65,6 +67,7 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.startsWith;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand All @@ -91,14 +94,12 @@ public void setup() {
.put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, "routing_repository")
.put(FsRepository.REPOSITORIES_COMPRESS_SETTING.getKey(), false)
.build();

blobStoreRepository = mock(BlobStoreRepository.class);
when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor());
blobStore = mock(BlobStore.class);
blobContainer = mock(BlobContainer.class);
when(repositoriesService.repository("routing_repository")).thenReturn(blobStoreRepository);
when(blobStoreRepository.blobStore()).thenReturn(blobStore);

Settings nodeSettings = Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, "true").build();
FeatureFlags.initializeFeatureFlags(nodeSettings);

Expand Down Expand Up @@ -552,4 +553,28 @@ private BlobPath getPath() {
RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64
);
}

public void testDeleteStaleIndexRoutingPaths() throws IOException {
doNothing().when(blobContainer).deleteBlobsIgnoringIfNotExists(any());
when(blobStore.blobContainer(any())).thenReturn(blobContainer);
List<String> stalePaths = Arrays.asList("path1", "path2");
remoteRoutingTableService.doStart();
remoteRoutingTableService.deleteStaleIndexRoutingPaths(stalePaths);
verify(blobContainer).deleteBlobsIgnoringIfNotExists(stalePaths);
}

public void testDeleteStaleIndexRoutingPathsThrowsIOException() throws IOException {
when(blobStore.blobContainer(any())).thenReturn(blobContainer);
List<String> stalePaths = Arrays.asList("path1", "path2");
// Simulate an IOException
doThrow(new IOException("test exception")).when(blobContainer).deleteBlobsIgnoringIfNotExists(Mockito.anyList());

remoteRoutingTableService.doStart();
IOException thrown = assertThrows(IOException.class, () -> {
remoteRoutingTableService.deleteStaleIndexRoutingPaths(stalePaths);
});
assertEquals("test exception", thrown.getMessage());
verify(blobContainer).deleteBlobsIgnoringIfNotExists(stalePaths);
}

}
Loading

0 comments on commit dcd0c93

Please sign in to comment.