Skip to content

Commit

Permalink
Upload remote paths during index creation or full cluster upload
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed Apr 11, 2024
1 parent 6532caa commit 691f8e5
Show file tree
Hide file tree
Showing 4 changed files with 537 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,16 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.index.Index;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
import org.opensearch.index.remote.RemoteIndexPath;
import org.opensearch.index.remote.RemoteStoreEnums.DataCategory;
import org.opensearch.index.remote.RemoteStoreEnums.DataType;
import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm;
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.node.Node;
Expand Down Expand Up @@ -64,6 +70,9 @@
import java.util.stream.Collectors;

import static org.opensearch.gateway.PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD;
import static org.opensearch.index.remote.RemoteIndexPath.SEGMENT_PATH;
import static org.opensearch.index.remote.RemoteIndexPath.TRANSLOG_PATH;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteDataAttributePresent;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

/**
Expand Down Expand Up @@ -122,6 +131,12 @@ public class RemoteClusterStateService implements Closeable {
Metadata::fromXContent
);

public static final ChecksumBlobStoreFormat<RemoteIndexPath> REMOTE_INDEX_PATH_FORMAT = new ChecksumBlobStoreFormat<>(
"remote-index-path",
RemoteIndexPath.FILE_NAME_FORMAT,
RemoteIndexPath::fromXContent
);

/**
* Manifest format compatible with older codec v0, where codec version was missing.
*/
Expand Down Expand Up @@ -163,6 +178,11 @@ public class RemoteClusterStateService implements Closeable {
private BlobStoreRepository blobStoreRepository;
private BlobStoreTransferService blobStoreTransferService;
private volatile TimeValue slowWriteLoggingThreshold;
private BlobStoreRepository translogRepository;
private BlobStoreTransferService translogTransferService;
private BlobStoreRepository segmentRepository;
private BlobStoreTransferService segmentsTransferService;
private final boolean isRemoteDataAttributePresent;

private volatile TimeValue indexMetadataUploadTimeout;
private volatile TimeValue globalMetadataUploadTimeout;
Expand Down Expand Up @@ -206,6 +226,7 @@ public RemoteClusterStateService(
clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setGlobalMetadataUploadTimeout);
clusterSettings.addSettingsUpdateConsumer(METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, this::setMetadataManifestUploadTimeout);
this.remoteStateStats = new RemotePersistenceStats();
this.isRemoteDataAttributePresent = isRemoteDataAttributePresent(settings);
}

private BlobStoreTransferService getBlobStoreTransferService() {
Expand All @@ -215,6 +236,20 @@ private BlobStoreTransferService getBlobStoreTransferService() {
return blobStoreTransferService;
}

private BlobStoreTransferService getTranslogTransferService() {
if (translogTransferService == null) {
translogTransferService = new BlobStoreTransferService(translogRepository.blobStore(), threadpool);
}
return translogTransferService;
}

private BlobStoreTransferService getSegmentsTransferService() {
if (segmentsTransferService == null) {
segmentsTransferService = new BlobStoreTransferService(segmentRepository.blobStore(), threadpool);
}
return segmentsTransferService;
}

/**
* This method uploads entire cluster state metadata to the configured blob store. For now only index metadata upload is supported. This method should be
* invoked by the elected cluster manager when the remote cluster state is enabled.
Expand All @@ -236,7 +271,8 @@ public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState, Stri
// any validations before/after upload ?
final List<UploadedIndexMetadata> allUploadedIndexMetadata = writeIndexMetadataParallel(
clusterState,
new ArrayList<>(clusterState.metadata().indices().values())
new ArrayList<>(clusterState.metadata().indices().values()),
previousClusterUUID
);
final ClusterMetadataManifest manifest = uploadManifest(
clusterState,
Expand Down Expand Up @@ -313,7 +349,7 @@ public ClusterMetadataManifest writeIncrementalMetadata(
.collect(Collectors.toMap(UploadedIndexMetadata::getIndexName, Function.identity()));

List<IndexMetadata> toUpload = new ArrayList<>();

final Map<String, Long> indexNamePreviousVersionMap = new HashMap<>(previousStateIndexMetadataVersionByName);
for (final IndexMetadata indexMetadata : clusterState.metadata().indices().values()) {
final Long previousVersion = previousStateIndexMetadataVersionByName.get(indexMetadata.getIndex().getName());
if (previousVersion == null || indexMetadata.getVersion() != previousVersion) {
Expand All @@ -331,7 +367,11 @@ public ClusterMetadataManifest writeIncrementalMetadata(
previousStateIndexMetadataVersionByName.remove(indexMetadata.getIndex().getName());
}

List<UploadedIndexMetadata> uploadedIndexMetadataList = writeIndexMetadataParallel(clusterState, toUpload);
List<UploadedIndexMetadata> uploadedIndexMetadataList = writeIndexMetadataParallel(
clusterState,
toUpload,
indexNamePreviousVersionMap
);
uploadedIndexMetadataList.forEach(
uploadedIndexMetadata -> allUploadedIndexMetadata.put(uploadedIndexMetadata.getIndexName(), uploadedIndexMetadata)
);
Expand Down Expand Up @@ -439,33 +479,18 @@ private String writeGlobalMetadata(ClusterState clusterState) throws IOException
* @param toUpload list of IndexMetadata to upload
* @return {@code List<UploadedIndexMetadata>} list of IndexMetadata uploaded to remote
*/
private List<UploadedIndexMetadata> writeIndexMetadataParallel(ClusterState clusterState, List<IndexMetadata> toUpload)
throws IOException {
List<Exception> exceptionList = Collections.synchronizedList(new ArrayList<>(toUpload.size()));
final CountDownLatch latch = new CountDownLatch(toUpload.size());
private List<UploadedIndexMetadata> writeIndexMetadataParallel(
ClusterState clusterState,
List<IndexMetadata> toUpload,
List<IndexMetadata> toUploadIndexPath
) throws IOException {
boolean isTranslogSegmentRepoSame = isTranslogSegmentRepoSame();
int latchCount = toUpload.size() + (isTranslogSegmentRepoSame ? toUploadIndexPath.size() : 2 * toUploadIndexPath.size());
List<Exception> exceptionList = Collections.synchronizedList(new ArrayList<>(latchCount));
final CountDownLatch latch = new CountDownLatch(latchCount);
List<UploadedIndexMetadata> result = new ArrayList<>(toUpload.size());

LatchedActionListener<UploadedIndexMetadata> latchedActionListener = new LatchedActionListener<>(
ActionListener.wrap((UploadedIndexMetadata uploadedIndexMetadata) -> {
logger.trace(
String.format(Locale.ROOT, "IndexMetadata uploaded successfully for %s", uploadedIndexMetadata.getIndexName())
);
result.add(uploadedIndexMetadata);
}, ex -> {
assert ex instanceof RemoteStateTransferException;
logger.error(
() -> new ParameterizedMessage("Exception during transfer of IndexMetadata to Remote {}", ex.getMessage()),
ex
);
exceptionList.add(ex);
}),
latch
);

for (IndexMetadata indexMetadata : toUpload) {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX/metadata_4_1690947200
writeIndexMetadataAsync(clusterState, indexMetadata, latchedActionListener);
}
uploadIndexMetadataAsync(clusterState, result, toUpload, latch, exceptionList);
uploadIndexPathAsync(toUploadIndexPath, latch, isTranslogSegmentRepoSame, exceptionList);

try {
if (latch.await(getIndexMetadataUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) {
Expand Down Expand Up @@ -506,6 +531,190 @@ private List<UploadedIndexMetadata> writeIndexMetadataParallel(ClusterState clus
return result;
}

private void uploadIndexPathAsync(
List<IndexMetadata> toUploadIndexPath,
CountDownLatch latch,
boolean isTranslogSegmentRepoSame,
List<Exception> exceptionList
) throws IOException {
for (IndexMetadata indexMetadata : toUploadIndexPath) {
writeIndexPathAsync(indexMetadata, latch, isTranslogSegmentRepoSame, exceptionList);
}
}

private void writeIndexPathAsync(
IndexMetadata idxMD,
CountDownLatch latch,
boolean isTranslogSegmentRepoSame,
List<Exception> exceptionList
) throws IOException {
Map<String, String> remoteCustomData = idxMD.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY);
PathType pathType = PathType.valueOf(remoteCustomData.get(PathType.NAME));
PathHashAlgorithm hashAlgorithm = PathHashAlgorithm.valueOf(remoteCustomData.get(PathHashAlgorithm.NAME));
String indexUUID = idxMD.getIndexUUID();
int shardCount = idxMD.getNumberOfShards();
BlobPath translogBasePath = translogRepository.basePath();
BlobContainer translogBlobContainer = translogRepository.blobStore().blobContainer(translogBasePath.add(RemoteIndexPath.DIR));

if (isTranslogSegmentRepoSame) {
// If the repositories are same, then we need to upload a single file containing paths for both translog and segments.
Map<DataCategory, List<DataType>> pathCreationMap = new HashMap<>();
pathCreationMap.putAll(TRANSLOG_PATH);
pathCreationMap.putAll(SEGMENT_PATH);
REMOTE_INDEX_PATH_FORMAT.writeAsyncWithUrgentPriority(
new RemoteIndexPath(indexUUID, shardCount, translogBasePath, pathType, hashAlgorithm, pathCreationMap),
translogBlobContainer,
indexUUID,
translogRepository.getCompressor(),
getUploadPathLatchedActionListener(idxMD, latch, exceptionList, pathCreationMap),
FORMAT_PARAMS,
true,
XContentType.JSON
);
} else {
// If the repositories are different, then we need to upload one file per segment and translog containing their individual
// paths.
REMOTE_INDEX_PATH_FORMAT.writeAsyncWithUrgentPriority(
new RemoteIndexPath(indexUUID, shardCount, translogBasePath, pathType, hashAlgorithm, TRANSLOG_PATH),
translogBlobContainer,
indexUUID,
translogRepository.getCompressor(),
getUploadPathLatchedActionListener(idxMD, latch, exceptionList, TRANSLOG_PATH),
FORMAT_PARAMS,
true,
XContentType.JSON
);

BlobPath segmentBasePath = segmentRepository.basePath();
BlobContainer segmentBlobContainer = segmentRepository.blobStore().blobContainer(segmentBasePath.add(RemoteIndexPath.DIR));
REMOTE_INDEX_PATH_FORMAT.writeAsyncWithUrgentPriority(
new RemoteIndexPath(indexUUID, shardCount, segmentBasePath, pathType, hashAlgorithm, SEGMENT_PATH),
segmentBlobContainer,
indexUUID,
segmentRepository.getCompressor(),
getUploadPathLatchedActionListener(idxMD, latch, exceptionList, SEGMENT_PATH),
FORMAT_PARAMS,
true,
XContentType.JSON
);
}
}

private LatchedActionListener<Void> getUploadPathLatchedActionListener(
IndexMetadata indexMetadata,
CountDownLatch latch,
List<Exception> exceptionList,
Map<DataCategory, List<DataType>> pathCreationMap
) {
return new LatchedActionListener<>(
ActionListener.wrap(
resp -> logger.trace(
new ParameterizedMessage("Index path uploaded for {} indexMetadata={}", pathCreationMap, indexMetadata)
),
ex -> {
logger.error(
new ParameterizedMessage(
"Exception during Index path upload for {} indexMetadata={}",
pathCreationMap,
indexMetadata
),
ex
);
exceptionList.add(ex);
}
),
latch
);
}

private void uploadIndexMetadataAsync(
ClusterState clusterState,
List<UploadedIndexMetadata> result,
List<IndexMetadata> toUpload,
CountDownLatch latch,
List<Exception> exceptionList
) throws IOException {
LatchedActionListener<UploadedIndexMetadata> indexMetadataLatchedActionListener = new LatchedActionListener<>(
ActionListener.wrap((UploadedIndexMetadata uploadedIndexMetadata) -> {
logger.trace(
String.format(Locale.ROOT, "IndexMetadata uploaded successfully for %s", uploadedIndexMetadata.getIndexName())
);
result.add(uploadedIndexMetadata);
}, ex -> {
assert ex instanceof RemoteStateTransferException;
logger.error(
() -> new ParameterizedMessage("Exception during transfer of IndexMetadata to Remote {}", ex.getMessage()),
ex
);
exceptionList.add(ex);
}),
latch
);

for (IndexMetadata indexMetadata : toUpload) {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX/metadata_4_1690947200
writeIndexMetadataAsync(clusterState, indexMetadata, indexMetadataLatchedActionListener);
}
}

private boolean isTranslogSegmentRepoSame() {
String translogRepoName = settings.get(
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY
);
String segmentRepoName = settings.get(
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY
);
return Objects.equals(translogRepoName, segmentRepoName);
}

private List<UploadedIndexMetadata> writeIndexMetadataParallel(
ClusterState clusterState,
List<IndexMetadata> toUpload,
String previousClusterUUID
) throws IOException {
List<IndexMetadata> toUploadIndexPath = Collections.emptyList();
if (ClusterState.UNKNOWN_UUID.equals(previousClusterUUID)) {
toUploadIndexPath = toUpload;
}
return writeIndexMetadataParallel(clusterState, toUpload, toUploadIndexPath);
}

private List<UploadedIndexMetadata> writeIndexMetadataParallel(
ClusterState clusterState,
List<IndexMetadata> toUpload,
Map<String, Long> indexNamePreviousVersionMap
) throws IOException {
List<IndexMetadata> toUploadIndexPath = Collections.emptyList();
if (isRemoteDataAttributePresent) {
toUploadIndexPath = toUpload.stream()
/* If the previous state's index metadata version is null, then this is index creation */
.filter(indexMetadata -> Objects.isNull(indexNamePreviousVersionMap.get(indexMetadata.getIndex().getName())))
/* Checks the condition if the Index path needs to be uploaded or not */
.filter(this::uploadIndexPathFile)
.collect(Collectors.toList());
}
return writeIndexMetadataParallel(clusterState, toUpload, toUploadIndexPath);
}

/**
* This method checks if the index metadata has attributes that calls for uploading the index path for remote store
* uploads. It checks if the remote store path type is {@code HASHED_PREFIX} and returns true if so.
*/
private boolean uploadIndexPathFile(IndexMetadata indexMetadata) {
assert isRemoteDataAttributePresent : "Remote data attributes is expected to be present";
// A cluster will have remote custom metadata only if the cluster is remote store enabled from data side.
Map<String, String> remoteCustomData = indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY);
if (Objects.isNull(remoteCustomData) || remoteCustomData.isEmpty()) {
return false;
}
String pathTypeStr = remoteCustomData.get(PathType.NAME);
if (Objects.isNull(pathTypeStr)) {
return false;
}
// We need to upload the path only if the path type for an index is hashed_prefix
return PathType.HASHED_PREFIX == PathType.parseString(pathTypeStr);
}

/**
* Allows async Upload of IndexMetadata to remote
*
Expand Down Expand Up @@ -574,13 +783,32 @@ public void close() throws IOException {

public void start() {
assert isRemoteStoreClusterStateEnabled(settings) == true : "Remote cluster state is not enabled";
final String remoteStoreRepo = settings.get(
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY
blobStoreRepository = (BlobStoreRepository) validateAndGetRepository(
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY,
"Cluster State"
);
assert remoteStoreRepo != null : "Remote Cluster State repository is not configured";
final Repository repository = repositoriesService.get().repository(remoteStoreRepo);

if (isRemoteDataAttributePresent == false) {
// If remote store data attributes are not present than we skip this.
return;
}
translogRepository = (BlobStoreRepository) validateAndGetRepository(
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY,
"Translog"
);
segmentRepository = (BlobStoreRepository) validateAndGetRepository(
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY,
"Translog"
);

}

private Repository validateAndGetRepository(String repoSetting, String repoName) {
final String repo = settings.get(repoSetting);
assert repo != null : "Remote " + repoName + " repository is not configured";
final Repository repository = repositoriesService.get().repository(repo);
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
blobStoreRepository = (BlobStoreRepository) repository;
return repository;
}

private ClusterMetadataManifest uploadManifest(
Expand Down Expand Up @@ -825,7 +1053,6 @@ private IndexMetadata getIndexMetadata(String clusterName, String clusterUUID, U
* @return {@link IndexMetadata}
*/
public ClusterState getLatestClusterState(String clusterName, String clusterUUID) {
start();
Optional<ClusterMetadataManifest> clusterMetadataManifest = getLatestClusterMetadataManifest(clusterName, clusterUUID);
if (clusterMetadataManifest.isEmpty()) {
throw new IllegalStateException(
Expand Down
Loading

0 comments on commit 691f8e5

Please sign in to comment.