diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index c892b475d71da..4d885e014931c 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -26,10 +26,16 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.io.IOUtils; +import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.action.ActionListener; import org.opensearch.core.index.Index; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; +import org.opensearch.index.remote.RemoteIndexPath; +import org.opensearch.index.remote.RemoteStoreEnums.DataCategory; +import org.opensearch.index.remote.RemoteStoreEnums.DataType; +import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm; +import org.opensearch.index.remote.RemoteStoreEnums.PathType; import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.node.Node; @@ -64,6 +70,9 @@ import java.util.stream.Collectors; import static org.opensearch.gateway.PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD; +import static org.opensearch.index.remote.RemoteIndexPath.SEGMENT_PATH; +import static org.opensearch.index.remote.RemoteIndexPath.TRANSLOG_PATH; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteDataAttributePresent; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled; /** @@ -122,6 +131,12 @@ public class RemoteClusterStateService implements Closeable { Metadata::fromXContent ); + public static final ChecksumBlobStoreFormat REMOTE_INDEX_PATH_FORMAT = new ChecksumBlobStoreFormat<>( + "remote-index-path", + RemoteIndexPath.FILE_NAME_FORMAT, + RemoteIndexPath::fromXContent + ); + /** * Manifest format compatible with older codec v0, where codec version was missing. */ @@ -163,6 +178,11 @@ public class RemoteClusterStateService implements Closeable { private BlobStoreRepository blobStoreRepository; private BlobStoreTransferService blobStoreTransferService; private volatile TimeValue slowWriteLoggingThreshold; + private BlobStoreRepository translogRepository; + private BlobStoreTransferService translogTransferService; + private BlobStoreRepository segmentRepository; + private BlobStoreTransferService segmentsTransferService; + private final boolean isRemoteDataAttributePresent; private volatile TimeValue indexMetadataUploadTimeout; private volatile TimeValue globalMetadataUploadTimeout; @@ -206,6 +226,7 @@ public RemoteClusterStateService( clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setGlobalMetadataUploadTimeout); clusterSettings.addSettingsUpdateConsumer(METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, this::setMetadataManifestUploadTimeout); this.remoteStateStats = new RemotePersistenceStats(); + this.isRemoteDataAttributePresent = isRemoteDataAttributePresent(settings); } private BlobStoreTransferService getBlobStoreTransferService() { @@ -215,6 +236,20 @@ private BlobStoreTransferService getBlobStoreTransferService() { return blobStoreTransferService; } + private BlobStoreTransferService getTranslogTransferService() { + if (translogTransferService == null) { + translogTransferService = new BlobStoreTransferService(translogRepository.blobStore(), threadpool); + } + return translogTransferService; + } + + private BlobStoreTransferService getSegmentsTransferService() { + if (segmentsTransferService == null) { + segmentsTransferService = new BlobStoreTransferService(segmentRepository.blobStore(), threadpool); + } + return segmentsTransferService; + } + /** * This method uploads entire cluster state metadata to the configured blob store. For now only index metadata upload is supported. This method should be * invoked by the elected cluster manager when the remote cluster state is enabled. @@ -236,7 +271,8 @@ public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState, Stri // any validations before/after upload ? final List allUploadedIndexMetadata = writeIndexMetadataParallel( clusterState, - new ArrayList<>(clusterState.metadata().indices().values()) + new ArrayList<>(clusterState.metadata().indices().values()), + previousClusterUUID ); final ClusterMetadataManifest manifest = uploadManifest( clusterState, @@ -313,7 +349,7 @@ public ClusterMetadataManifest writeIncrementalMetadata( .collect(Collectors.toMap(UploadedIndexMetadata::getIndexName, Function.identity())); List toUpload = new ArrayList<>(); - + final Map indexNamePreviousVersionMap = new HashMap<>(previousStateIndexMetadataVersionByName); for (final IndexMetadata indexMetadata : clusterState.metadata().indices().values()) { final Long previousVersion = previousStateIndexMetadataVersionByName.get(indexMetadata.getIndex().getName()); if (previousVersion == null || indexMetadata.getVersion() != previousVersion) { @@ -331,7 +367,11 @@ public ClusterMetadataManifest writeIncrementalMetadata( previousStateIndexMetadataVersionByName.remove(indexMetadata.getIndex().getName()); } - List uploadedIndexMetadataList = writeIndexMetadataParallel(clusterState, toUpload); + List uploadedIndexMetadataList = writeIndexMetadataParallel( + clusterState, + toUpload, + indexNamePreviousVersionMap + ); uploadedIndexMetadataList.forEach( uploadedIndexMetadata -> allUploadedIndexMetadata.put(uploadedIndexMetadata.getIndexName(), uploadedIndexMetadata) ); @@ -439,33 +479,18 @@ private String writeGlobalMetadata(ClusterState clusterState) throws IOException * @param toUpload list of IndexMetadata to upload * @return {@code List} list of IndexMetadata uploaded to remote */ - private List writeIndexMetadataParallel(ClusterState clusterState, List toUpload) - throws IOException { - List exceptionList = Collections.synchronizedList(new ArrayList<>(toUpload.size())); - final CountDownLatch latch = new CountDownLatch(toUpload.size()); + private List writeIndexMetadataParallel( + ClusterState clusterState, + List toUpload, + List toUploadIndexPath + ) throws IOException { + boolean isTranslogSegmentRepoSame = isTranslogSegmentRepoSame(); + int latchCount = toUpload.size() + (isTranslogSegmentRepoSame ? toUploadIndexPath.size() : 2 * toUploadIndexPath.size()); + List exceptionList = Collections.synchronizedList(new ArrayList<>(latchCount)); + final CountDownLatch latch = new CountDownLatch(latchCount); List result = new ArrayList<>(toUpload.size()); - - LatchedActionListener latchedActionListener = new LatchedActionListener<>( - ActionListener.wrap((UploadedIndexMetadata uploadedIndexMetadata) -> { - logger.trace( - String.format(Locale.ROOT, "IndexMetadata uploaded successfully for %s", uploadedIndexMetadata.getIndexName()) - ); - result.add(uploadedIndexMetadata); - }, ex -> { - assert ex instanceof RemoteStateTransferException; - logger.error( - () -> new ParameterizedMessage("Exception during transfer of IndexMetadata to Remote {}", ex.getMessage()), - ex - ); - exceptionList.add(ex); - }), - latch - ); - - for (IndexMetadata indexMetadata : toUpload) { - // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX/metadata_4_1690947200 - writeIndexMetadataAsync(clusterState, indexMetadata, latchedActionListener); - } + uploadIndexMetadataAsync(clusterState, result, toUpload, latch, exceptionList); + uploadIndexPathAsync(toUploadIndexPath, latch, isTranslogSegmentRepoSame, exceptionList); try { if (latch.await(getIndexMetadataUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) { @@ -506,6 +531,190 @@ private List writeIndexMetadataParallel(ClusterState clus return result; } + private void uploadIndexPathAsync( + List toUploadIndexPath, + CountDownLatch latch, + boolean isTranslogSegmentRepoSame, + List exceptionList + ) throws IOException { + for (IndexMetadata indexMetadata : toUploadIndexPath) { + writeIndexPathAsync(indexMetadata, latch, isTranslogSegmentRepoSame, exceptionList); + } + } + + private void writeIndexPathAsync( + IndexMetadata idxMD, + CountDownLatch latch, + boolean isTranslogSegmentRepoSame, + List exceptionList + ) throws IOException { + Map remoteCustomData = idxMD.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY); + PathType pathType = PathType.valueOf(remoteCustomData.get(PathType.NAME)); + PathHashAlgorithm hashAlgorithm = PathHashAlgorithm.valueOf(remoteCustomData.get(PathHashAlgorithm.NAME)); + String indexUUID = idxMD.getIndexUUID(); + int shardCount = idxMD.getNumberOfShards(); + BlobPath translogBasePath = translogRepository.basePath(); + BlobContainer translogBlobContainer = translogRepository.blobStore().blobContainer(translogBasePath.add(RemoteIndexPath.DIR)); + + if (isTranslogSegmentRepoSame) { + // If the repositories are same, then we need to upload a single file containing paths for both translog and segments. + Map> pathCreationMap = new HashMap<>(); + pathCreationMap.putAll(TRANSLOG_PATH); + pathCreationMap.putAll(SEGMENT_PATH); + REMOTE_INDEX_PATH_FORMAT.writeAsyncWithUrgentPriority( + new RemoteIndexPath(indexUUID, shardCount, translogBasePath, pathType, hashAlgorithm, pathCreationMap), + translogBlobContainer, + indexUUID, + translogRepository.getCompressor(), + getUploadPathLatchedActionListener(idxMD, latch, exceptionList, pathCreationMap), + FORMAT_PARAMS, + true, + XContentType.JSON + ); + } else { + // If the repositories are different, then we need to upload one file per segment and translog containing their individual + // paths. + REMOTE_INDEX_PATH_FORMAT.writeAsyncWithUrgentPriority( + new RemoteIndexPath(indexUUID, shardCount, translogBasePath, pathType, hashAlgorithm, TRANSLOG_PATH), + translogBlobContainer, + indexUUID, + translogRepository.getCompressor(), + getUploadPathLatchedActionListener(idxMD, latch, exceptionList, TRANSLOG_PATH), + FORMAT_PARAMS, + true, + XContentType.JSON + ); + + BlobPath segmentBasePath = segmentRepository.basePath(); + BlobContainer segmentBlobContainer = segmentRepository.blobStore().blobContainer(segmentBasePath.add(RemoteIndexPath.DIR)); + REMOTE_INDEX_PATH_FORMAT.writeAsyncWithUrgentPriority( + new RemoteIndexPath(indexUUID, shardCount, segmentBasePath, pathType, hashAlgorithm, SEGMENT_PATH), + segmentBlobContainer, + indexUUID, + segmentRepository.getCompressor(), + getUploadPathLatchedActionListener(idxMD, latch, exceptionList, SEGMENT_PATH), + FORMAT_PARAMS, + true, + XContentType.JSON + ); + } + } + + private LatchedActionListener getUploadPathLatchedActionListener( + IndexMetadata indexMetadata, + CountDownLatch latch, + List exceptionList, + Map> pathCreationMap + ) { + return new LatchedActionListener<>( + ActionListener.wrap( + resp -> logger.trace( + new ParameterizedMessage("Index path uploaded for {} indexMetadata={}", pathCreationMap, indexMetadata) + ), + ex -> { + logger.error( + new ParameterizedMessage( + "Exception during Index path upload for {} indexMetadata={}", + pathCreationMap, + indexMetadata + ), + ex + ); + exceptionList.add(ex); + } + ), + latch + ); + } + + private void uploadIndexMetadataAsync( + ClusterState clusterState, + List result, + List toUpload, + CountDownLatch latch, + List exceptionList + ) throws IOException { + LatchedActionListener indexMetadataLatchedActionListener = new LatchedActionListener<>( + ActionListener.wrap((UploadedIndexMetadata uploadedIndexMetadata) -> { + logger.trace( + String.format(Locale.ROOT, "IndexMetadata uploaded successfully for %s", uploadedIndexMetadata.getIndexName()) + ); + result.add(uploadedIndexMetadata); + }, ex -> { + assert ex instanceof RemoteStateTransferException; + logger.error( + () -> new ParameterizedMessage("Exception during transfer of IndexMetadata to Remote {}", ex.getMessage()), + ex + ); + exceptionList.add(ex); + }), + latch + ); + + for (IndexMetadata indexMetadata : toUpload) { + // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX/metadata_4_1690947200 + writeIndexMetadataAsync(clusterState, indexMetadata, indexMetadataLatchedActionListener); + } + } + + private boolean isTranslogSegmentRepoSame() { + String translogRepoName = settings.get( + Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY + ); + String segmentRepoName = settings.get( + Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY + ); + return Objects.equals(translogRepoName, segmentRepoName); + } + + private List writeIndexMetadataParallel( + ClusterState clusterState, + List toUpload, + String previousClusterUUID + ) throws IOException { + List toUploadIndexPath = Collections.emptyList(); + if (ClusterState.UNKNOWN_UUID.equals(previousClusterUUID)) { + toUploadIndexPath = toUpload; + } + return writeIndexMetadataParallel(clusterState, toUpload, toUploadIndexPath); + } + + private List writeIndexMetadataParallel( + ClusterState clusterState, + List toUpload, + Map indexNamePreviousVersionMap + ) throws IOException { + List toUploadIndexPath = Collections.emptyList(); + if (isRemoteDataAttributePresent) { + toUploadIndexPath = toUpload.stream() + /* If the previous state's index metadata version is null, then this is index creation */ + .filter(indexMetadata -> Objects.isNull(indexNamePreviousVersionMap.get(indexMetadata.getIndex().getName()))) + /* Checks the condition if the Index path needs to be uploaded or not */ + .filter(this::uploadIndexPathFile) + .collect(Collectors.toList()); + } + return writeIndexMetadataParallel(clusterState, toUpload, toUploadIndexPath); + } + + /** + * This method checks if the index metadata has attributes that calls for uploading the index path for remote store + * uploads. It checks if the remote store path type is {@code HASHED_PREFIX} and returns true if so. + */ + private boolean uploadIndexPathFile(IndexMetadata indexMetadata) { + assert isRemoteDataAttributePresent : "Remote data attributes is expected to be present"; + // A cluster will have remote custom metadata only if the cluster is remote store enabled from data side. + Map remoteCustomData = indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY); + if (Objects.isNull(remoteCustomData) || remoteCustomData.isEmpty()) { + return false; + } + String pathTypeStr = remoteCustomData.get(PathType.NAME); + if (Objects.isNull(pathTypeStr)) { + return false; + } + // We need to upload the path only if the path type for an index is hashed_prefix + return PathType.HASHED_PREFIX == PathType.parseString(pathTypeStr); + } + /** * Allows async Upload of IndexMetadata to remote * @@ -574,13 +783,32 @@ public void close() throws IOException { public void start() { assert isRemoteStoreClusterStateEnabled(settings) == true : "Remote cluster state is not enabled"; - final String remoteStoreRepo = settings.get( - Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY + blobStoreRepository = (BlobStoreRepository) validateAndGetRepository( + Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, + "Cluster State" ); - assert remoteStoreRepo != null : "Remote Cluster State repository is not configured"; - final Repository repository = repositoriesService.get().repository(remoteStoreRepo); + + if (isRemoteDataAttributePresent == false) { + // If remote store data attributes are not present than we skip this. + return; + } + translogRepository = (BlobStoreRepository) validateAndGetRepository( + Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, + "Translog" + ); + segmentRepository = (BlobStoreRepository) validateAndGetRepository( + Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, + "Translog" + ); + + } + + private Repository validateAndGetRepository(String repoSetting, String repoName) { + final String repo = settings.get(repoSetting); + assert repo != null : "Remote " + repoName + " repository is not configured"; + final Repository repository = repositoriesService.get().repository(repo); assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository"; - blobStoreRepository = (BlobStoreRepository) repository; + return repository; } private ClusterMetadataManifest uploadManifest( @@ -825,7 +1053,6 @@ private IndexMetadata getIndexMetadata(String clusterName, String clusterUUID, U * @return {@link IndexMetadata} */ public ClusterState getLatestClusterState(String clusterName, String clusterUUID) { - start(); Optional clusterMetadataManifest = getLatestClusterMetadataManifest(clusterName, clusterUUID); if (clusterMetadataManifest.isEmpty()) { throw new IllegalStateException( diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteIndexPath.java b/server/src/main/java/org/opensearch/index/remote/RemoteIndexPath.java new file mode 100644 index 0000000000000..83448df427c84 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/remote/RemoteIndexPath.java @@ -0,0 +1,138 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.remote; + +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.index.remote.RemoteStoreEnums.DataCategory; +import org.opensearch.index.remote.RemoteStoreEnums.DataType; +import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm; +import org.opensearch.index.remote.RemoteStoreEnums.PathType; +import org.opensearch.index.remote.RemoteStorePathStrategy.PathInput; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS; +import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG; +import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA; +import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA; +import static org.opensearch.index.remote.RemoteStorePathStrategy.isCompatible; + +/** + * Remote index path information. + * + * @opensearch.internal + */ +public class RemoteIndexPath implements ToXContentFragment { + + public static final Map> TRANSLOG_PATH = Map.of(TRANSLOG, List.of(DATA, METADATA)); + public static final Map> SEGMENT_PATH = Map.of(SEGMENTS, List.of(DataType.values())); + private static final String DEFAULT_VERSION = "1"; + public static final String DIR = "remote-index-path"; + public static final String FILE_NAME_FORMAT = "remote_path_%s"; + static final String KEY_VERSION = "version"; + static final String KEY_INDEX_UUID = "index_uuid"; + static final String KEY_SHARD_COUNT = "shard_count"; + static final String KEY_PATHS = "paths"; + private final String indexUUID; + private final int shardCount; + private final Iterable basePath; + private final PathType pathType; + private final PathHashAlgorithm pathHashAlgorithm; + + /** + * This keeps the map of paths that would be present in the content of the index path file. For eg - It is possible + * that segment and translog repository can be different. For this use case, we have either segment or translog as the + * key, and list of data, metadata, and lock_files (only for segment) as the value. + */ + private final Map> pathCreationMap; + + public RemoteIndexPath( + String indexUUID, + int shardCount, + Iterable basePath, + PathType pathType, + PathHashAlgorithm pathHashAlgorithm, + Map> pathCreationMap + ) { + if (Objects.isNull(pathCreationMap) + || Objects.isNull(pathType) + || isCompatible(pathType, pathHashAlgorithm) == false + || shardCount < 1 + || Objects.isNull(basePath) + || pathCreationMap.isEmpty() + || pathCreationMap.keySet().stream().anyMatch(k -> pathCreationMap.get(k).isEmpty())) { + ParameterizedMessage parameterizedMessage = new ParameterizedMessage( + "Invalid input in RemoteIndexPath constructor indexUUID={} shardCount={} basePath={} pathType={}" + + " pathHashAlgorithm={} pathCreationMap={}", + indexUUID, + shardCount, + basePath, + pathType, + pathHashAlgorithm, + pathCreationMap + ); + throw new IllegalArgumentException(parameterizedMessage.getFormattedMessage()); + } + boolean validMap = pathCreationMap.keySet() + .stream() + .allMatch(k -> pathCreationMap.get(k).stream().allMatch(k::isSupportedDataType)); + if (validMap == false) { + throw new IllegalArgumentException( + new ParameterizedMessage("pathCreationMap={} is having illegal combination of category and type", pathCreationMap) + .getFormattedMessage() + ); + } + this.indexUUID = indexUUID; + this.shardCount = shardCount; + this.basePath = basePath; + this.pathType = pathType; + this.pathHashAlgorithm = pathHashAlgorithm; + this.pathCreationMap = pathCreationMap; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(KEY_VERSION, DEFAULT_VERSION); + builder.field(KEY_INDEX_UUID, indexUUID); + builder.field(KEY_SHARD_COUNT, shardCount); + builder.field(PathType.NAME, pathType.name()); + if (Objects.nonNull(pathHashAlgorithm)) { + builder.field(PathHashAlgorithm.NAME, pathHashAlgorithm.name()); + } + builder.startArray(KEY_PATHS); + for (Map.Entry> entry : pathCreationMap.entrySet()) { + DataCategory dataCategory = entry.getKey(); + for (DataType type : entry.getValue()) { + for (int shardNo = 0; shardNo < shardCount; shardNo++) { + PathInput pathInput = PathInput.builder() + .basePath(new BlobPath().add(basePath)) + .indexUUID(indexUUID) + .shardId(Integer.toString(shardNo)) + .dataCategory(dataCategory) + .dataType(type) + .build(); + builder.value(pathType.path(pathInput, pathHashAlgorithm).buildAsString()); + } + } + } + builder.endArray(); + return builder; + } + + public static RemoteIndexPath fromXContent(XContentParser ignored) throws IOException { + throw new UnsupportedOperationException("RemoteIndexPath.fromXContent() is not supported"); + } +} diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java index 3e6052a5ef820..1be096dd92577 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java @@ -57,6 +57,7 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.compress.Compressor; +import org.opensearch.core.xcontent.MediaType; import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.ToXContent; @@ -190,9 +191,32 @@ public void write( final String name, final Compressor compressor, final ToXContent.Params params + ) throws IOException { + write(obj, blobContainer, name, compressor, params, false, XContentType.SMILE); + } + + /** + * Writes blob with resolving the blob name using {@link #blobName} method. + *

+ * The blob will optionally by compressed. + * + * @param obj object to be serialized + * @param blobContainer blob container + * @param name blob name + * @param compressor whether to use compression + * @param params ToXContent params + */ + public void write( + final T obj, + final BlobContainer blobContainer, + final String name, + final Compressor compressor, + final ToXContent.Params params, + boolean skipHeaderFooter, + MediaType mediaType ) throws IOException { final String blobName = blobName(name); - final BytesReference bytes = serialize(obj, blobName, compressor, params); + final BytesReference bytes = serialize(obj, blobName, compressor, params, skipHeaderFooter, mediaType); blobContainer.writeBlob(blobName, bytes.streamInput(), bytes.length(), false); } @@ -208,7 +232,17 @@ public void writeAsync( final ToXContent.Params params ) throws IOException { // use NORMAL priority by default - this.writeAsyncWithPriority(obj, blobContainer, name, compressor, WritePriority.NORMAL, listener, params); + this.writeAsyncWithPriority( + obj, + blobContainer, + name, + compressor, + WritePriority.NORMAL, + listener, + params, + false, + XContentType.SMILE + ); } /** @@ -226,7 +260,30 @@ public void writeAsyncWithUrgentPriority( ActionListener listener, final ToXContent.Params params ) throws IOException { - this.writeAsyncWithPriority(obj, blobContainer, name, compressor, WritePriority.URGENT, listener, params); + this.writeAsyncWithPriority( + obj, + blobContainer, + name, + compressor, + WritePriority.URGENT, + listener, + params, + false, + XContentType.SMILE + ); + } + + public void writeAsyncWithUrgentPriority( + final T obj, + final BlobContainer blobContainer, + final String name, + final Compressor compressor, + ActionListener listener, + final ToXContent.Params params, + boolean skipHeaderFooter, + MediaType type + ) throws IOException { + this.writeAsyncWithPriority(obj, blobContainer, name, compressor, WritePriority.URGENT, listener, params, skipHeaderFooter, type); } /** @@ -248,15 +305,17 @@ private void writeAsyncWithPriority( final Compressor compressor, final WritePriority priority, ActionListener listener, - final ToXContent.Params params + final ToXContent.Params params, + boolean skipHeaderFooter, + MediaType mediaType ) throws IOException { if (blobContainer instanceof AsyncMultiStreamBlobContainer == false) { - write(obj, blobContainer, name, compressor, params); + write(obj, blobContainer, name, compressor, params, skipHeaderFooter, mediaType); listener.onResponse(null); return; } final String blobName = blobName(name); - final BytesReference bytes = serialize(obj, blobName, compressor, params); + final BytesReference bytes = serialize(obj, blobName, compressor, params, skipHeaderFooter, mediaType); final String resourceDescription = "ChecksumBlobStoreFormat.writeAsyncWithPriority(blob=\"" + blobName + "\")"; try (IndexInput input = new ByteArrayIndexInput(resourceDescription, BytesReference.toBytes(bytes))) { long expectedChecksum; @@ -290,6 +349,17 @@ private void writeAsyncWithPriority( public BytesReference serialize(final T obj, final String blobName, final Compressor compressor, final ToXContent.Params params) throws IOException { + return serialize(obj, blobName, compressor, params, false, XContentType.SMILE); + } + + public BytesReference serialize( + final T obj, + final String blobName, + final Compressor compressor, + final ToXContent.Params params, + boolean skipHeaderFooter, + MediaType type + ) throws IOException { try (BytesStreamOutput outputStream = new BytesStreamOutput()) { try ( OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput( @@ -299,7 +369,9 @@ public BytesReference serialize(final T obj, final String blobName, final Compre BUFFER_SIZE ) ) { - CodecUtil.writeHeader(indexOutput, codec, VERSION); + if (skipHeaderFooter == false) { + CodecUtil.writeHeader(indexOutput, codec, VERSION); + } try (OutputStream indexOutputOutputStream = new IndexOutputOutputStream(indexOutput) { @Override public void close() throws IOException { @@ -308,7 +380,7 @@ public void close() throws IOException { } }; XContentBuilder builder = MediaTypeRegistry.contentBuilder( - XContentType.SMILE, + type, compressor.threadLocalOutputStream(indexOutputOutputStream) ) ) { @@ -316,7 +388,9 @@ public void close() throws IOException { obj.toXContent(builder, params); builder.endObject(); } - CodecUtil.writeFooter(indexOutput); + if (skipHeaderFooter == false) { + CodecUtil.writeFooter(indexOutput); + } } return outputStream.bytes(); } diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteIndexPathTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteIndexPathTests.java new file mode 100644 index 0000000000000..42e42bca8e55f --- /dev/null +++ b/server/src/test/java/org/opensearch/index/remote/RemoteIndexPathTests.java @@ -0,0 +1,54 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.remote; + +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.core.xcontent.MediaTypeRegistry; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm; +import org.opensearch.index.remote.RemoteStoreEnums.PathType; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS; +import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA; +import static org.opensearch.index.remote.RemoteStoreEnums.DataType.LOCK_FILES; +import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA; + +public class RemoteIndexPathTests extends OpenSearchTestCase { + + public void testToXContent() throws IOException { + RemoteIndexPath indexPath = new RemoteIndexPath( + "test", + 5, + new BlobPath().add("dsd").add("hello"), + PathType.HASHED_PREFIX, + PathHashAlgorithm.FNV_1A, + Map.of(SEGMENTS, List.of(DATA, METADATA, LOCK_FILES)) + ); + XContentBuilder xContentBuilder = MediaTypeRegistry.contentBuilder(MediaTypeRegistry.JSON); + xContentBuilder.startObject(); + xContentBuilder = indexPath.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS); + xContentBuilder.endObject(); + String expected = "{\"version\":\"1\",\"index_uuid\":\"test\",\"shard_count\":5,\"path_type\":\"HASHED_PREFIX\"" + + ",\"path_hash_algorithm\":\"FNV_1A\",\"paths\":[\"7w7J_t5dXgk/dsd/hello/test/0/segments/data/\"," + + "\"d2tIsVGsh9I/dsd/hello/test/1/segments/data/\",\"jDj7aact8oc/dsd/hello/test/2/segments/data/\"," + + "\"Ea34xvbcE4g/dsd/hello/test/3/segments/data/\",\"EaMx7uStzM0/dsd/hello/test/4/segments/data/\"," + + "\"WR3ZZ6pAH9w/dsd/hello/test/0/segments/metadata/\",\"OmAyQrYTH9c/dsd/hello/test/1/segments/metadata/\"," + + "\"9GFNLB8iu2I/dsd/hello/test/2/segments/metadata/\",\"t4sqq-4QAiU/dsd/hello/test/3/segments/metadata/\"," + + "\"m5IHfRLxuTg/dsd/hello/test/4/segments/metadata/\",\"1l_zRI9-N0I/dsd/hello/test/0/segments/lock_files/\"," + + "\"BJxu8oivE1k/dsd/hello/test/1/segments/lock_files/\",\"mkME0l_DbDA/dsd/hello/test/2/segments/lock_files/\"," + + "\"W7LqEuhiyTc/dsd/hello/test/3/segments/lock_files/\",\"Ls5bj0LgiKY/dsd/hello/test/4/segments/lock_files/\"]}"; + assertEquals(expected, xContentBuilder.toString()); + } +}